Author: w7ay@Knownsec 404 Team
Chinese version: https://paper.seebug.org/913/
Related reading: How to build your own PoC framework - the use of Pocsuite3

In this article, I want to use Poscuite to implement my own PoC framework.

First of all, let's take a nice name, such as Captain America, Thor, Ant-Man or Thanos. If you like playing Dota, you can also call it Mountain King, BladeMaster, Priess Of the moon.

I call it AirPoc because I'm lazy.

The next step is to design its function. We can assume that there exists a "rabbit security alliance". We plan to develop a blockchain-based PoC verification framework named as AirPoc, which is only responsible for the websites within "rabbit security alliance".

When one AirPoc node checks out the vulnerability point, the URL and the PoC is shared via block, and then verfied by other random nodes. If the verification succeed, the "air currency" can be obtained, and the detected website owner needs to pay "air currency" as compensation.

First just imagine what kind of PoC framework we need:

1. Simple and cross-platform
2. Can make more people contribute
3. Easy to build into other software
4. High verfication speed and accuracy
5. I don't know what to include in the framework. But the first thing is to build a runable one!

Of course, I didn't add the blockchain, it's just a joke.

## Detail

We use python(3.7 the last version) as the programming language and decide not to use third-party dependencies.

It is cool to make contribution to some open source projects like sqlmap, Metasploit and Routersploit. So I decide to publish the AirPoc's PoC on Github and then we can call the PoC via online API. That's cool!

It is also easier to integrate into other softwares. If it uses Python, you can import AirPoC as a package. For other software, AirPoc designs the RPC interface for them to call. Besides, if you don't have a Python environment, you can also usepyinstaller to package it. Don't rely on other libraries is our design principle, which will help to avoid many other problems.

To achieve high verification speed and high verification accuracy, we need to build a concurrency model with multi-thread and coroutine, which I will describe later.

## AirPoc's framework

Before we complete the plan, we also need to design code structure. As a perfectionist programmer, the good code structure is the first step when constructing a big project. We create the directory as following. , env is the directory for managing packages in a virtual environment. libis the directory for storing related core files. pocs is the directory for storing poc files, and a file main .py is used as the initial entry.

As building depends on its foundations, the basic framework is the base for our future work. We don't need to write the specific functions, but should understand "foundation" function. In the main.py file, an initial framework is completed.

import os
import time

def banner():
msg = '''
___   _   _____    _____   _____   _____
/   | | | |  _  \  |  _  \ /  _  \ /  ___|
/ /| | | | | |_| |  | |_| | | | | | | |
/ / | | | | |  _  /  |  ___/ | | | | | |
/ /  | | | | | | \ \  | |     | |_| | | |___
/_/   |_| |_| |_|  \_\ |_|     \_____/ \_____|   {}
'''.format(version)
print(msg)

def init(config: dict):
print("[*] target:{}".format(config["url"]))

def end():
print("[*] shutting down at {0}".format(time.strftime("%X")))

def start():
pass

def main():
banner()
config = {
"url": "https://www.seebug.org/"
}
init(config)
start()
end()

if __name__ == '__main__':
version = "v0.00000001"
main()


However, as you can see, the version number is as low as how much bitcoin in my wallet, we need to add some material.

### Singleton

In the initialization stage of software engineering, we need to store a lot of environment-related information. For example, what is the current execution path, where is the PoC directory, where is the directory the output file located and so on.

All those information has the same attribute. Those information should be loaded once and then can be used directly without change. In the software engineering, the "singleton" pattern, one pattern in software engineering, fits that need very well.

Fortunately, the python module is a natural singleton, because when the code is executed for the first time, it will generate the.pyc file for the future use. Then the import of that module will use the previous file without extra execution. Therefore, we only need to define related functions and data in a module to get a singleton object.

We create a new data.py in the lib directory to store this information. Also put the version information here.

import os

PATHS_ROOT = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
PATHS_POCS = os.path.join(PATHS_ROOT, "pocs")
PATHS_OUTPUT = os.path.join(PATHS_ROOT, "output")
VERSION = "v0.0000001"


We use the PEP8 standard with uppercase and underscores to represent these constants. To illustrate the difference from the previous one, we symbolically reduce version number by a 0 to express our "bitcoin" and increased by 10 times.

### Dynamic module

After solving our delete related environmental problems, let's take a look at how to dynamically load modules. As what we discuss before. we expect PoC to be able to load from local or remote sites such as GitHub.

There are two cases here. If you load the module through the file path, you can directly use __import__ () to load. If you want to load remotely, it may be more complicated. According to python related documents, we have to implement the "Finder" and "Loader". https://docs.python.org/en-us/3/reference/import.html.

Of course, you can also use the local storage mode after saving from remote websites. But we can use the loader code the PocSuite provided already.

Create a new lib/loader.py file

import hashlib
import importlib

def get_md5(value):
if isinstance(value, str):
value = value.encode(encoding='UTF-8')
return hashlib.md5(value).hexdigest()

try:
module_name = 'pocs_{0}'.format(get_md5(code_string)) if fullname is None else fullname
file_path = 'airpoc://{0}'.format(module_name)
mod = importlib.util.module_from_spec(spec)
return mod

except ImportError:
error_msg = "load module '{0}' failed!".format(fullname)
print(error_msg)
raise

def __init__(self, fullname, path):
self.fullname = fullname
self.path = path
self.data = None

def set_data(self, data):
self.data = data

def get_filename(self, fullname):
return self.path

def get_data(self, filename):
if filename.startswith('airpoc://') and self.data:
data = self.data
else:
with open(filename, encoding='utf-8') as f:
return data

def exec_module(self, module):
filename = self.get_filename(self.fullname)
poc_code = self.get_data(filename)
obj = compile(poc_code, filename, 'exec', dont_inherit=True, optimize=-1)
exec(obj, module.__dict__)


The detail of the implement is not what we should care about. We only need to know that we can use load_string_to_module to load modules from the source code. If you are interested in how that works, you can refer to the official python documentation mentioned above.

## PoC's code format development

After loading the module, we can code for the operation. We need to set some uniform convention for the PoC, thus the program can call them better.

The complexity of the PoC depends on the design structure we need, it can be completed and detailed or as simple as possible.As mentioned earlier, in order to protect the security of the "security alliance", we need PoC to be simpler and faster to write.

At the same time, we also need to consider the case in which the PoC needs to handle multiple parameters. My rules are defined as follows:

def verify(arg, **kwargs):
result = {}
if requests.get(arg).status_code == 200:
result = {
"name":"vulnerability name",
"url":arg
}
return result


Define a verify function in the PoC file for validation, arg as a normal parameter, and receive from kwargs when more parameters need to be passed. After the PoC verification succeed, we return a dictionary, otherwise return False or None. The content of the dictionary can be designed by the PoC writer with the author maximum flexibility.

Attention please! The quality of a PoC relies on the maintenance of the author.

## V0.01

The ultimate goal we have to achieve is to set the target, and then the program automatically loads the specified one or more PoCs or all PoCs, and detects the targets one by one. The rest of our work is integrating these functions together. We have already implemented the basic framework of AirPoc, and now we only need to implement the functions on top of that.For the convenience of testing, we first create two simple PoCs in the pocs directory according to the previously defined rules.

Now,main.py code is

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 2019/4/25 3:13 PM
# @Author  : w7ay
# @File    : main.py
import os
import time
from lib.data import VERSION, PATHS_POCS, POCS

def banner():
msg = '''
___   _   _____    _____   _____   _____
/   | | | |  _  \  |  _  \ /  _  \ /  ___|
/ /| | | | | |_| |  | |_| | | | | | | |
/ / | | | | |  _  /  |  ___/ | | | | | |
/ /  | | | | | | \ \  | |     | |_| | | |___
/_/   |_| |_| |_|  \_\ |_|     \_____/ \_____|   {}
'''.format(VERSION)
print(msg)

def init(config: dict):
print("[*] target:{}".format(config["url"]))

# Load the poc, first traverse the path
_pocs = []
for root, dirs, files in os.walk(PATHS_POCS):
files = filter(lambda x: not x.startswith("__") and x.endswith(".py") and x not in config.get("poc", []),
files)  # Filter out the __init__.py file and specify the poc file
_pocs.extend(map(lambda x: os.path.join(root, x), files))

# Load PoC according to the path
for poc in _pocs:
with open(poc, 'r') as f:
POCS.append(model)

def end():
print("[*] shutting down at {0}".format(time.strftime("%X")))

def start(config: dict):
url_list = config.get("url", [])
# The loop url_list and pocs are executed one by one.
for i in url_list:
for poc in POCS:
try:
ret = poc.verify(i)
except Exception as e:
ret = None
print(e)
if ret:
print(ret)

def main():
banner()
config = {
"url": ["https://www.seebug.org/", "https://paper.seebug.org/"],
"poc": []
}
init(config)
start(config)
end()

if __name__ == '__main__':
main()


Our version has also come to 0.01, it is already a "mature" framework which can run PoC on its own.

In order to make our framework run faster, we use multithreading to handle each PoC, because most of the tasks we deal with are I/O intensive tasks, so we don't have to worry about whether Python is a pseudo-thread.

The simplest of the multithreaded models is the producer/consumer model, which starts multiple threads to consume a queue together. Create a new lib/threads.py

import threading
import time

try:
except KeyboardInterrupt:
raise
except Exception as ex:

try:
except Exception as ex:
err_msg = "error occurred while starting new thread ('{0}')".format(str(ex))
print(err_msg)
break

# Waiting for all threads to finish
alive = True
while alive:
alive = False
alive = True
time.sleep(0.1)


It's worth noting that we didn't use the join() recommended in the Python thread to block the thread, because with join(), python will not be able to respond to user-entered messages, causing Ctrl+C to exit. There is no response, so the thread is blocked in a while loop.

Then the main program is transformed into a multi-threaded mode, and the "consumer" in the original start() is extracted and used as a function alone, and the data can be received by the queue. As follows:

def worker():
if not WORKER.empty():
arg, poc = WORKER.get()
try:
ret = poc.verify(arg)
except Exception as e:
ret = None
print(e)
if ret:
print(ret)

def start(config: dict):
url_list = config.get("url", [])

# producer
for arg in url_list:
for poc in POCS:
WORKER.put((arg, poc))

# comsumer


In addition, the number of threads is configurable and we change it to read from the configuration.

run_threads(config.get("thread_num", 10), worker)


Run it again and you will find it much faster than before!

## Network request

This is the last part of our entire framework, how to unify network requests. Sometimes we need to make the settings of the proxy, UA header, etc. in the network request sent by our PoC framework, which requires our framework to handle it uniformly. Before we can achieve our goal, we also need to make an agreement in the framework, agree that our network requests need to use requests to send packets. At the beginning we said that we will try not to use third-party modules, but the requests module is so good that we exclude it...

The dynamic mechanism of the Python language, we can easily hook it before using a function, and redirect its original method to our custom method, which is a prerequisite for us to unify the network request.

def hello(arg):
return "hello " + arg

def hook(arg):
arg = arg.upper()
return "hello " + arg

hello = hook

print(hello("aa"))


By hooking a function to achieve our own purpose.

Tools like sqlmap are based on Python's built-in urllib module, but a lot of code is being processed in network requests, and even to handle the chunked issue, hooks rewrite the lower-level httplib library.

In order to uniformly schedule network requests, pocsuite hooks the related methods of the requests module. We can refer to the code in detail.

The pocsuite3/lib/request/patch/__init__.py code clearly illustrates the hook function.

from .remove_ssl_verify import remove_ssl_verify
from .remove_warnings import disable_warnings
from .hook_request import patch_session
from .hook_request_redirect import patch_redirect

def patch_all():
disable_warnings()
remove_ssl_verify()
patch_session()
patch_redirect()


If you look at the source of the requests, you will know that the focus is on how it hooks the seesion function.

pocsuite3/lib/request/patch/hook_request.py

from pocsuite3.lib.core.data import conf
from requests.models import Request
from requests.sessions import Session
from requests.utils import get_encodings_from_content

def session_request(self, method, url,
timeout=conf.timeout if 'timeout' in conf else None,
allow_redirects=True, proxies=None, hooks=None, stream=None, verify=False, cert=None, json=None):
# Create the Request

req = Request(
method=method.upper(),
url=url,
files=files,
data=data or {},
json=json,
params=params or {},
auth=auth,
hooks=hooks,
)
prep = self.prepare_request(req)

proxies = proxies or (conf.proxies if 'proxies' in conf else {})

settings = self.merge_environment_settings(
prep.url, proxies, stream, verify, cert
)

# Send the request.
send_kwargs = {
'timeout': timeout,
'allow_redirects': allow_redirects,
}
send_kwargs.update(settings)
resp = self.send(prep, **send_kwargs)

if resp.encoding == 'ISO-8859-1':
encodings = get_encodings_from_content(resp.text)
if encodings:
encoding = encodings[0]
else:
encoding = resp.apparent_encoding

resp.encoding = encoding

return resp

def patch_session():
Session.request = session_request


It overrides the method of the session_request function, which allows you to customize the information such as our custom headers. The above code may require you to read the requests to understand him, but it doesn't matter, we can use it directly in the spirit of takership.

In order to achieve this and better optimize the framework, we also need to make some minor adjustments.

Create a new lib/requests.py

from lib.data import CONF
from requests.models import Request
from requests.sessions import Session
from requests.utils import get_encodings_from_content

def session_request(self, method, url,
timeout=None,
allow_redirects=True, proxies=None, hooks=None, stream=None, verify=False, cert=None, json=None):
# Create the Request.
conf = CONF.get("requests", {})
if timeout is None and "timeout" in conf:
timeout = conf["timeout"]

req = Request(
method=method.upper(),
url=url,
files=files,
data=data or {},
json=json,
params=params or {},
auth=auth,
hooks=hooks,
)
prep = self.prepare_request(req)

proxies = proxies or (conf["proxies"] if 'proxies' in conf else {})

settings = self.merge_environment_settings(
prep.url, proxies, stream, verify, cert
)

# Send the request.
send_kwargs = {
'timeout': timeout,
'allow_redirects': allow_redirects,
}
send_kwargs.update(settings)
resp = self.send(prep, **send_kwargs)

if resp.encoding == 'ISO-8859-1':
encodings = get_encodings_from_content(resp.text)
if encodings:
encoding = encodings[0]
else:
encoding = resp.apparent_encoding

resp.encoding = encoding

return resp

def patch_session():
Session.request = session_request


Also reserve the request interface in config

And execute our hooks when init.

Let's write a new PoC and use this site to test the final effect http://www.httpbin.org/get

pocs/poc.py

import requests

def verify(arg, **kwargs):
r = requests.get(arg)
if r.status_code == 200:
return {"url": arg, "text": r.text}


The effect is very good, but if you add https website, there is a warning message.

Also refer to the Pocsuite method to disable the warning information.

from urllib3 import disable_warnings
disable_warnings()


Finally, change the version number to 0.1, and the framework part of AirPoc is generally completed.

## At last

Many of AirPoc's structural ideas are come from Pocsuite. If you read Pocsuite directly, you may get a lot of things. At present, the AirPoc v0.1 infrastructure has been almost completed, and one or more PoCs can be loaded locally for batch testing. Later, we are trying to play some more fun,eg: how to verify the situation without echo, how to generate shellcode, and how to operate the shell back, so stay tuned for the next section~zzzzz.