Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and improve AFL fuzzing #1462

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions examples/fuzzing/linux_x8664/fuzz_x8664_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,44 @@
$ rm -fr afl_outputs/default/
"""

# No more need for importing unicornafl, try afl.ql_afl_fuzz instead!

import os
import sys

from typing import Optional
from typing import Sequence

QLHOME = os.path.realpath(r'../../..')

sys.path.append("../../..")
sys.path.append(QLHOME)
from qiling import Qiling
from qiling.const import QL_VERBOSE
from qiling.extensions import pipe
from qiling.extensions import afl

def main(input_file: str):
ql = Qiling(["./x8664_fuzz"], "../../rootfs/x8664_linux",
verbose=QL_VERBOSE.OFF, # keep qiling logging off
console=False) # thwart program output

def main(argv: Sequence[str], rootfs: str, infilename: str):
# initialize a qiling instance.
# note we keep verbosity off and thwart the program's output to gain some speed-up
ql = Qiling(argv, rootfs, verbose=QL_VERBOSE.OFF, console=False)

# get the image base address
img = ql.loader.get_image_by_name('x8664_fuzz')
assert img is not None

# fuzzing scope: the main function
main_begins = img.base + 0x1275
main_ends = img.base + 0x1293

# redirect stdin to our mock to feed it with incoming fuzzed keystrokes
ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno())

def place_input_callback(ql: Qiling, input: bytes, persistent_round: int) -> Optional[bool]:
def place_input_callback(ql: Qiling, feed: bytes, round: int) -> bool:
"""Feed generated stimuli to the fuzzed target.

This method is called with every fuzzing iteration.
"""

# feed fuzzed input to our mock stdin
ql.os.stdin.write(input)
ql.os.stdin.write(feed)

# signal afl to proceed with this input
return True
Expand All @@ -54,23 +63,24 @@ def start_afl(ql: Qiling):
"""Have Unicorn fork and start instrumentation.
"""

afl.ql_afl_fuzz(ql, input_file=input_file, place_input_callback=place_input_callback, exits=[ql.os.exit_point])
afl.ql_afl_fuzz(ql, infilename, place_input_callback, [main_ends])

# set afl instrumentation [re]starting point
ql.hook_address(start_afl, main_begins)

# get image base address
ba = ql.loader.images[0].base
def __crash(ql: Qiling) -> None:
os.abort()

# make the process crash whenever __stack_chk_fail@plt is about to be called.
# this way afl will count stack protection violations as crashes
ql.hook_address(callback=lambda x: os.abort(), address=ba + 0x126e)

# set afl instrumentation [re]starting point. we set it to 'main'
ql.hook_address(callback=start_afl, address=ba + 0x1275)
ql.hook_address(__crash, img.base + 0x126e)

# okay, ready to roll
ql.run()

if __name__ == "__main__":
if len(sys.argv) == 1:
raise ValueError("No input file provided.")

main(sys.argv[1])
main(
rf'{QLHOME}/examples/fuzzing/linux_x8664/x8664_fuzz'.split(),
rf'{QLHOME}/examples/rootfs/x8664_linux',
rf'{QLHOME}/examples/fuzzing/linux_x8664/afl_inputs/a'
)
87 changes: 87 additions & 0 deletions examples/fuzzing/linux_x8664/qlfuzzer_x8664_linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env python3

"""Simple example of how to use QlFuzzer to easily create a custom fuzzer that
leverages Qiling and AFLplusplus.

Note: this example refers to linux_x8664/fuzz_x8664_linux.py

Steps:
o Clone and build AFL++
$ git clone https://github.com/AFLplusplus/AFLplusplus.git
$ make -C AFLplusplus

o Build Unicorn support
$ ( cd AFLplusplus/unicorn_mode ; ./build_unicorn_support.sh )

o Start fuzzing
$ AFL_AUTORESUME=1 AFL_PATH="$(realpath ./AFLplusplus)" PATH="$AFL_PATH:$PATH" afl-fuzz -i afl_inputs -o afl_outputs -U -- python3 ./qlfuzzer_x8664_linux.py @@

o Cleanup results
$ rm -fr afl_outputs/default/
"""

from __future__ import annotations

import os
import sys

from typing import TYPE_CHECKING, Collection, Optional, Sequence

# replace this if qiling is located elsewhere
QLHOME = os.path.realpath(r'../../..')

sys.path.append(QLHOME)
from qiling.extensions import pipe
from qiling.extensions.afl.qlfuzzer import QlFuzzer


if TYPE_CHECKING:
from qiling import Qiling


class MyFuzzer(QlFuzzer):
"""Custom fuzzer.
"""

def setup(self, infilename: str, entry: int, exits: Collection[int], crashes: Optional[Collection[int]] = None) -> None:
super().setup(infilename, entry, exits, crashes)

# redirect stdin to our mock to feed it with incoming fuzzed keystrokes
self.ql.os.stdin = pipe.SimpleInStream(sys.stdin.fileno())

def feed_input(self, ql: Qiling, stimuli: bytes, pround: int) -> bool:
# feed fuzzed input as-is to our mock stdin
ql.os.stdin.write(stimuli)

# signal afl to proceed with this input
return True


def main(argv: Sequence[str], rootfs: str, infilename: str):
# initialize our custom fuzzer
fuzzer = MyFuzzer(argv, rootfs)

# calculate fuzzing scope effective addresses
main_begins = fuzzer.ea(0x1275)
main_ends = fuzzer.ea(0x1293)

# make the process crash whenever __stack_chk_fail@plt is about to be called.
# this way afl will count stack protection violations as fuzzing crashes
stack_chk_fail = fuzzer.ea(0x126e)

# set up fuzzing parameters
fuzzer.setup(infilename, main_begins, [main_ends], [stack_chk_fail])

# start fuzzing.
#
# note that although the main function is being fuzzed, we start emulating the program from its
# default starting point to make sure 'main' has all the necessary data initialized and ready.
fuzzer.run()


if __name__ == '__main__':
main(
rf'{QLHOME}/examples/fuzzing/linux_x8664/x8664_fuzz'.split(),
rf'{QLHOME}/examples/rootfs/x8664_linux',
rf'{QLHOME}/examples/fuzzing/linux_x8664/afl_inputs/a'
)
191 changes: 111 additions & 80 deletions qiling/extensions/afl/afl.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,130 @@
from typing import List, Callable
from qiling.arch.arm import QlArchARM
from qiling.core import Qiling
from unicornafl import *
from unicorn import UcError

from __future__ import annotations
from typing import TYPE_CHECKING, Any, Collection, Optional, Callable

from unicornafl import UcAflError, UC_AFL_RET_CALLED_TWICE, uc_afl_fuzz_custom
from unicorn import UcError, UC_ERR_OK

from qiling import Qiling
from qiling.exception import QlErrorNotImplemented


if TYPE_CHECKING:
from unicorn import Uc
from ctypes import c_char, Array

InputFeedingCallback = Callable[[Qiling, bytes, int], bool]
FuzzingCallback = Callable[[Qiling], int]
CrashValidationCallback = Callable[[Qiling, int, bytes, int], bool]


def ql_afl_fuzz(ql: Qiling,
input_file: str,
place_input_callback: Callable[["Qiling", bytes, int], bool],
exits: List[int],
validate_crash_callback: Callable[["Qiling", int, bytes, int], bool] = None,
place_input_callback: InputFeedingCallback,
exits: Collection[int],
validate_crash_callback: Optional[CrashValidationCallback] = None,
always_validate: bool = False,
persistent_iters: int = 1):
""" Fuzz a range of code with afl++.
This function wraps some common logic with unicornafl.uc_afl_fuzz.
NOTE: If no afl-fuzz instance is found, this function is almost identical to ql.run.
:param Qiling ql: The Qiling instance.
:param str input_file: This usually is the input file name provided by the command argument.
:param Callable place_input_callback: This callback is triggered every time a new child is
generated. It returns True if the input is accepted, or the input would be skipped.
:param list exits: All possible exits.
:param Callable validate_crash_callback: This callback is triggered every time to check if we are crashed.
:param bool always_validate: If this is set to False, validate_crash_callback will be only triggered if
uc_emu_start (which is called internally by afl_fuzz) returns an error. Or the validate_crash_callback will
be triggered every time.
:param int persistent_iters: Fuzz how many times before forking a new child.
:raises UcAflError: If something wrong happens with the fuzzer.
persistent_iters: int = 1) -> None:
"""Fuzz a range of code with afl++.
This function wraps some common logic with unicornafl.uc_afl_fuzz.

Args:
ql: qiling instance

filename: path to a file that contains an initial input data. this is usually
the filename provided as the fuzzer command line argument

place_input_callback: a callback that is triggered whenever a new child process is created
and about to be fed with a new fuzzing input. the callback is responsible
to place the newly generated stimuli (as is, or manipulated to the users'
need) where the fuzzed program expects to find its input: e.g. stdin,
memory buffer, file, etc. based on the stimuli, the callback can decide
whether afl should proceed with this round (returns `True`) or discard
it (returns `False`)

exits: addresses that mark a graceful completion of the fuzzed flow

validate_crash_callback: a callback that is triggered to check whether the emulation has crashed

always_validate: indicate whether the crash validating callback should be called on every
iteration (`True`) or only when emluation raises an exception (`False`, default)

persistent_iters: Reuse the same process for this many fuzzing iterations before forking
a new child process (default: 1)

Raises:
UcAflError: If something wrong happens with the fuzzer.
"""

def __fuzzing_wrapper(ql: Qiling) -> int:
"""Emulation wrapper.
"""

def _dummy_fuzz_callback(_ql: "Qiling"):
if isinstance(_ql.arch, QlArchARM):
pc = _ql.arch.effective_pc
else:
pc = _ql.arch.regs.arch_pc
try:
_ql.uc.emu_start(pc, 0, 0, 0)
except UcError as e:
return e.errno

return UC_ERR_OK

return ql_afl_fuzz_custom(ql, input_file, place_input_callback, _dummy_fuzz_callback, exits,
validate_crash_callback, always_validate, persistent_iters)
# if we are fuzzin an arm code, make sure to take the effective pc
pc = getattr(ql.arch, 'effective_pc', ql.arch.regs.arch_pc)

try:
ql.arch.uc.emu_start(pc, 0)
except UcError as err:
return err.errno

return UC_ERR_OK

def __null_crash_validation(ql: Qiling, result: int, input_bytes: bytes, round: int) -> bool:
return False

ql_afl_fuzz_custom(
ql,
input_file,
place_input_callback,
__fuzzing_wrapper,
exits,
validate_crash_callback or __null_crash_validation,
always_validate,
persistent_iters)


def ql_afl_fuzz_custom(ql: Qiling,
input_file: str,
place_input_callback: Callable[["Qiling", bytes, int], bool],
fuzzing_callback: Callable[["Qiling"], int],
exits: List[int] = [],
validate_crash_callback: Callable[["Qiling", bytes, int], bool] = None,
place_input_callback: InputFeedingCallback,
fuzzing_callback: FuzzingCallback,
exits: Collection[int],
validate_crash_callback: CrashValidationCallback,
always_validate: bool = False,
persistent_iters: int = 1):

ql.uc.ctl_exits_enabled(True)
ql.uc.ctl_set_exits(exits)

def _ql_afl_place_input_wrapper(uc, input_bytes, iters, data):
(ql, cb, _, _) = data
def __place_input_wrapper(uc: Uc, input_bytes: Array[c_char], iters: int, context: Any) -> bool:
return place_input_callback(ql, input_bytes.value, iters)

if cb:
return cb(ql, input_bytes, iters)
else:
return False
def __validate_crash_wrapper(uc: Uc, result: int, input_bytes: bytes, iters: int, context: Any) -> bool:
return validate_crash_callback(ql, result, input_bytes, iters)

def _ql_afl_validate_wrapper(uc, result, input_bytes, iters, data):
(ql, _, cb, _) = data
def __fuzzing_wrapper(uc: Uc, context: Any) -> int:
return fuzzing_callback(ql)

if cb:
return cb(ql, result, input_bytes, iters)
else:
return False
uc = ql.arch.uc
uc.ctl_exits_enabled(True)
uc.ctl_set_exits(exits)

def _ql_afl_fuzzing_callback_wrapper(uc, data):
(ql, _, _, cb) = data
try:
uc_afl_fuzz_custom(
uc,
input_file,
__place_input_wrapper,
__fuzzing_wrapper,
__validate_crash_wrapper,
always_validate,
persistent_iters,
None)

return cb(ql)
except NameError as ex:
raise QlErrorNotImplemented('unicornafl is not installed or AFL++ is not supported on this platform') from ex

data = (ql, place_input_callback, validate_crash_callback, fuzzing_callback)
try:
# uc_afl_fuzz will never return non-zero value.
uc_afl_fuzz_custom(ql.uc,
input_file=input_file,
place_input_callback=_ql_afl_place_input_wrapper,
fuzzing_callback=_ql_afl_fuzzing_callback_wrapper,
validate_crash_callback=_ql_afl_validate_wrapper,
always_validate=always_validate,
persistent_iters=persistent_iters,
data=data)
except NameError as ex:
raise QlErrorNotImplemented("unicornafl is not installed or AFL++ is not supported on this platform") from ex
except UcAflError as ex:
if ex.errno != UC_AFL_RET_CALLED_TWICE:
# This one is special. Many fuzzing scripts start fuzzing in a Unicorn UC_HOOK_CODE callback and
# starts execution on the current address, which results in a duplicate UC_HOOK_CODE callback. To
# make unicornafl easy to use, we handle this siliently.
#
# For other exceptions, we raise them.
raise
except UcAflError as ex:
if ex.errno != UC_AFL_RET_CALLED_TWICE:
# many fuzzing scripts start fuzzing with a Unicorn UC_HOOK_CODE callback and while
# starting execution at the current address. that results in a duplicated UC_HOOK_CODE
# callback. we handle this case siliently for simplicity
#
# For other exceptions, we raise them.
raise
Loading
Loading