diff --git a/bindings/python/unicorn/unicorn.py b/bindings/python/unicorn/unicorn.py index 58c6ee06..dbbc4d35 100644 --- a/bindings/python/unicorn/unicorn.py +++ b/bindings/python/unicorn/unicorn.py @@ -9,7 +9,7 @@ import os.path import sys import weakref import functools - +import gc from . import x86_const, arm64_const, unicorn_const as uc if not hasattr(sys.modules[__name__], "__file__"): @@ -120,6 +120,15 @@ class _uc_mem_region(ctypes.Structure): ("perms", ctypes.c_uint32), ] +#typedef bool (*uc_afl_cb_place_input_t)(uc_engine *uc, char *input, +# size_t input_len, uint32_t persistent_round, void *data); +AFL_PLACE_INPUT_CB = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ctypes.POINTER(ctypes.c_char), + ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p) + +#typedef bool (*uc_afl_cb_validate_crash_t)(uc_engine *uc, uc_err unicorn_result, char *input, +# int input_len, int persistent_round, void *data); +AFL_VALIDATE_CRASH_CB = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ucerr, ctypes.POINTER(ctypes.c_char), + ctypes.c_size_t, ctypes.c_uint32, ctypes.c_void_p) _setup_prototype(_uc, "uc_version", ctypes.c_uint, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int)) _setup_prototype(_uc, "uc_arch_supported", ctypes.c_bool, ctypes.c_int) @@ -151,6 +160,7 @@ _setup_prototype(_uc, "uc_context_free", ucerr, uc_context) _setup_prototype(_uc, "uc_mem_regions", ucerr, uc_engine, ctypes.POINTER(ctypes.POINTER(_uc_mem_region)), ctypes.POINTER(ctypes.c_uint32)) # https://bugs.python.org/issue42880 _setup_prototype(_uc, "uc_hook_add", ucerr, uc_engine, ctypes.POINTER(uc_hook_h), ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_uint64) +_setup_prototype(_uc, "uc_afl_fuzz", ucerr, uc_engine, ctypes.c_char_p, ctypes.c_void_p, ctypes.POINTER(ctypes.c_uint64), ctypes.c_size_t, ctypes.c_void_p, ctypes.c_bool, ctypes.c_uint32, ctypes.c_void_p) UC_HOOK_CODE_CB = ctypes.CFUNCTYPE(None, uc_engine, ctypes.c_uint64, ctypes.c_size_t, ctypes.c_void_p) UC_HOOK_INSN_INVALID_CB = ctypes.CFUNCTYPE(ctypes.c_bool, uc_engine, ctypes.c_void_p) @@ -411,6 +421,8 @@ class Uc(object): self._callback_count = 0 self._cleanup.register(self) + self.afl_called_before = False # type: bool + @staticmethod def release_handle(uch): if uch: @@ -432,6 +444,127 @@ class Uc(object): status = _uc.uc_emu_stop(self._uch) if status != uc.UC_ERR_OK: raise UcError(status) + + def afl_fuzz( + self, # type: Uc + input_file, # type: str + place_input_callback, # type: Callable[[Uc, bytes, int, Any], Optional[bool]] + exits, # type: List[int] + validate_crash_callback=None, # type: Optional[Callable[[Uc, UcError, bytes, int, Any], Optional[bool]]] + always_validate=False, # type: bool + persistent_iters=1, # type: int + data=None, # type: Any + ): + # type: (...) -> bool + """ + The main fuzzer. + Starts the forkserver, then beginns a persistent loop. + Reads input, calls the place_input callback, emulates, repeats. + If unicorn errors out, will call the validate_crash_callback, if set. + Will only return in the parent after the whole fuzz thing has been finished and afl died. + The child processes never return from here. + + :param input_file: filename/path to the (AFL) inputfile. Usually supplied on the commandline. + :param place_input_callback: Callback function that will be called before each test runs. + This function needs to write the input from afl to the correct position on the unicorn object. + This function is mandatory. + It's purpose is to place the input at the right place in unicorn. + + @uc: (Uc) Unicorn instance + @input: (bytes) The current input we're working on. Place this somewhere in unicorn's memory now. + @persistent_round: (int) which round we are currently crashing in, if using persistent mode. + @data: (Any) Data pointer passed to uc_afl_fuzz(...). + + @return: (bool) + If you return is True (or None) all is well. Fuzzing starts. + If you return False, the input is rejected; we will continue with the next input. + :param exits: address list of exits where fuzzing should stop + :param persistent_iters: + The amount of loop iterations in persistent mode before restarting with a new forked child. + If your target cannot be fuzzed using persistent mode (global state changes a lot), + set persistent_iters = 1 for the normal fork-server experience. + Else, the default is usually around 1000. + If your target is super stable (and unicorn is, too - not sure about that one), + you may pass persistent_iter = 0 for that an infinite fuzz loop. + :param validate_crash_callback: Optional callback (if not needed, pass NULL), that determines + if a non-OK uc_err is an actual error. If false is returned, the test-case will not crash. + Callback function called after a non-UC_ERR_OK returncode was returned by Unicorn. + This function is not mandatory. + @uc: Unicorn instance + @unicorn_result: The error state returned by the current testcase + @input: The current input we're working with. + @persistent_round: which round we are currently crashing in, if using persistent mode. + @data: Data pointer passed to uc_afl_fuzz(...). + + @Return: + If you return false, the crash is considered invalid and not reported to AFL. + -> Next loop iteration begins. + If return is true, the crash is reported // the program crashes. + -> The child will die and the forkserver will spawn a new child. + :param always_validate: If false, validate_crash_callback will only be called for crashes. + :param data: Your very own data pointer. This will passed into every callback. + + :return: + True, if we fuzzed. + False, if AFL was not available but we ran once. + raises UcAflException if nothing worked. + """ + if self.afl_called_before: + raise UcError(uc.UC_ERR_AFL_RET_CALLED_TWICE) + self.afl_called_before = True + self._pre_afl() + exit_count = len(exits) + + def place_input_wrapper(c_uc, input, input_len, persistent_round, c_data): + # print("Calling back home. :)", c_uc, input, input_len, persistent_round, c_data) + ret = place_input_callback( + self, + ctypes.cast(input, ctypes.POINTER(ctypes.c_char * input_len)).contents, + persistent_round, + data + ) + if ret is False: + return False + return True + + def validate_crash_wrapper(c_uc, uc_err, input, input_len, persistent_round, c_data): + # print("Calling after crash!", c_uc, input, input_len, persistent_round, c_data) + # assert type(uc_err) == int + ret = validate_crash_callback( + self, + UcError(uc_err), + ctypes.cast(input, ctypes.POINTER(ctypes.c_char * input_len)).contents, + persistent_round, + data + ) + if ret is False or (ret is None and uc_err == uc.UC_ERR_OK): + return False + return True + + # This only returns in the parent, child processes all die or loop or other things. + status = _uc.uc_afl_fuzz( + self._uch, + input_file.encode('utf-8'), + AFL_PLACE_INPUT_CB(place_input_wrapper), + (ctypes.c_uint64 * exit_count)(*exits), + exit_count, # bad languages, like c, need more params. + AFL_VALIDATE_CRASH_CB(validate_crash_wrapper) if validate_crash_callback else None, + always_validate, + persistent_iters, + None # no need to pass the user data through C as the callback keeps it as closure. + ) + if status != uc.UC_ERR_OK: + # Something went wrong. + raise UcError(status) + + def _pre_afl(self): + # type: (Uc) -> None + """ + Internal func making sure exits are set and flushing buffers/gc + :param exits: exits + """ + sys.stdout.flush() # otherwise children will inherit the unflushed buffer + gc.collect() # Collect all unneeded memory, No need to clone it on fork. # return the value of a register def reg_read(self, reg_id, opt=None):