diff --git a/tests/conftest.py b/tests/conftest.py index 6730844..140befa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,8 @@ from contextlib import contextmanager from pathlib import Path +from dissect.cobaltstrike import beacon + import pytest beacons = { @@ -17,7 +19,7 @@ # This automatically generates a fixture for each beacon file def generate_beacon_file_fixture(filename): - @pytest.fixture(scope="function") + @pytest.fixture() def my_fixture(request): testpath = Path(request.fspath.dirname) beacon_zip_path = testpath / "beacons" / filename @@ -36,12 +38,32 @@ def my_fixture(request): return my_fixture +def generate_beacon_path_fixture(filename): + @pytest.fixture() + def my_fixture(request, tmp_path): + testpath = Path(request.fspath.dirname) + beacon_zip_path = testpath / "beacons" / filename + if not beacon_zip_path.exists(): + pytest.skip(f"Beacon {beacon_zip_path!r} not found") + + with zipfile.ZipFile(beacon_zip_path) as zf: + zf.extract(beacon_zip_path.stem, path=tmp_path, pwd=b"dissect.cobaltstrike") + return tmp_path / beacon_zip_path.stem + + return my_fixture + + def inject_beacon_file_fixture(name, filename): globals()[name] = generate_beacon_file_fixture(filename) +def inject_beacon_path_fixture(name, filename): + globals()[name] = generate_beacon_path_fixture(filename) + + for name, filename in beacons.items(): inject_beacon_file_fixture(f"{name}_file", filename) + inject_beacon_path_fixture(f"{name}_path", filename) @contextmanager @@ -49,3 +71,8 @@ def unzip_beacon_as_fh(zip_file, pwd=b"dissect.cobaltstrike"): """Return file object of beacon from zipfile""" with zipfile.ZipFile(zip_file) as zf: yield zf.open(zip_file.stem, pwd=pwd) + + +@pytest.fixture() +def beacon_x64_config_block(beacon_x64_file): + return beacon.BeaconConfig.from_file(beacon_x64_file).config_block diff --git a/tests/test_beacon.py b/tests/test_beacon.py index 2287024..46063d2 100644 --- a/tests/test_beacon.py +++ b/tests/test_beacon.py @@ -1,4 +1,6 @@ import io +import sys +import subprocess from unittest.mock import patch from dissect.cobaltstrike import beacon @@ -176,3 +178,109 @@ def test_beacon_settings_readonly(beacon_x64_file): with pytest.raises(TypeError): bconfig.raw_settings["foo"] = "bar" + + +@pytest.mark.parametrize( + ("fixture", "options", "ret", "stdout", "stderr"), + [ + pytest.param( + "beacon_custom_xorkey_file", + ["-x", "0xCC"], + 0, + b"SETTING_PUBKEY = '36aff0b273cb7aa704e4219ad3be78defcc8c1d7ecb779d55f438e82c7138673'", + None, + id="beacon_custom_xorkey_file-stdin-0xcc", + ), + pytest.param( + "beacon_x86_file", + [], + 0, + b"SETTING_PUBKEY = '71fab2149cbdce552f00e6d75372494d3f7755d366fd6849a6d5c9e0f73bc40f'", + None, + id="beacon_x86_file-stdin-normal", + ), + pytest.param( + "beacon_x86_file", + ["-t", "c2profile"], + 0, + b'prepend "wordpress_ed1f617bbd6c004cc09e046f3c1b7148="', + None, + id="beacon_x86_file-stdin-c2profile", + ), + pytest.param( + "beacon_x86_file", + ["-t", "raw"], + 0, + b", type=", + None, + id="beacon_x86_file-stdin-raw", + ), + pytest.param( + "beacon_x86_file", + ["-t", "dumpstruct"], + 0, + b"BeaconSetting.SETTING_PUBKEY", + None, + id="beacon_x86_file-stdin-dumpstruct", + ), + pytest.param( + "beacon_x64_path", + ["-t", "normal"], + 0, + b"SETTING_PUBKEY = ", + None, + id="beacon_x64_path-normal", + ), + pytest.param( + "beacon_x64_config_block", + [], + 0, + b"SETTING_PUBKEY = ", + None, + id="beacon_x64_config_block-stdin-unobfuscated", + ), + pytest.param( + b"NO BACON", + [], + 1, + None, + b"No beacon configuration found", + id="invalid-beacon-stdin", + ), + ], +) +def test_main(capfdbinary, request, fixture, options, ret, stdout, stderr): + if isinstance(fixture, bytes): + beacon_file = io.BytesIO(fixture) + else: + beacon_file = request.getfixturevalue(fixture) + + if isinstance(beacon_file, bytes): + beacon_file = io.BytesIO(beacon_file) + + stdin = subprocess.PIPE if hasattr(beacon_file, "read") else None + proc = subprocess.Popen( + [ + sys.executable, + "-m", + "dissect.cobaltstrike.beacon", + *options, + "-" if stdin else beacon_file, + ], + stdin=stdin, + ) + if stdin: + while True: + data = beacon_file.read(1024) + if not data: + break + proc.stdin.write(data) + proc.stdin.flush() + proc.stdin.close() + proc.wait() + assert proc.returncode == ret + cap = capfdbinary.readouterr() + if stdout: + assert stdout in cap.out + if stderr: + assert stderr in cap.err