Skip to content

Commit

Permalink
Add tests for dissect.cobaltstrike.beacon main function (#11)
Browse files Browse the repository at this point in the history
It also tests the different output options, xor keys and input files such as stdin.
  • Loading branch information
yunzheng authored Apr 11, 2022
1 parent 0e4719e commit 4d8f0ff
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 1 deletion.
29 changes: 28 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from contextlib import contextmanager
from pathlib import Path

from dissect.cobaltstrike import beacon

import pytest

beacons = {
Expand All @@ -17,7 +19,7 @@

# This automatically generates a fixture for each beacon file
def generate_beacon_file_fixture(filename):
@pytest.fixture(scope="function")
@pytest.fixture()
def my_fixture(request):
testpath = Path(request.fspath.dirname)
beacon_zip_path = testpath / "beacons" / filename
Expand All @@ -36,16 +38,41 @@ def my_fixture(request):
return my_fixture


def generate_beacon_path_fixture(filename):
@pytest.fixture()
def my_fixture(request, tmp_path):
testpath = Path(request.fspath.dirname)
beacon_zip_path = testpath / "beacons" / filename
if not beacon_zip_path.exists():
pytest.skip(f"Beacon {beacon_zip_path!r} not found")

with zipfile.ZipFile(beacon_zip_path) as zf:
zf.extract(beacon_zip_path.stem, path=tmp_path, pwd=b"dissect.cobaltstrike")
return tmp_path / beacon_zip_path.stem

return my_fixture


def inject_beacon_file_fixture(name, filename):
globals()[name] = generate_beacon_file_fixture(filename)


def inject_beacon_path_fixture(name, filename):
globals()[name] = generate_beacon_path_fixture(filename)


for name, filename in beacons.items():
inject_beacon_file_fixture(f"{name}_file", filename)
inject_beacon_path_fixture(f"{name}_path", filename)


@contextmanager
def unzip_beacon_as_fh(zip_file, pwd=b"dissect.cobaltstrike"):
"""Return file object of beacon from zipfile"""
with zipfile.ZipFile(zip_file) as zf:
yield zf.open(zip_file.stem, pwd=pwd)


@pytest.fixture()
def beacon_x64_config_block(beacon_x64_file):
return beacon.BeaconConfig.from_file(beacon_x64_file).config_block
108 changes: 108 additions & 0 deletions tests/test_beacon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import io
import sys
import subprocess
from unittest.mock import patch

from dissect.cobaltstrike import beacon
Expand Down Expand Up @@ -176,3 +178,109 @@ def test_beacon_settings_readonly(beacon_x64_file):

with pytest.raises(TypeError):
bconfig.raw_settings["foo"] = "bar"


@pytest.mark.parametrize(
("fixture", "options", "ret", "stdout", "stderr"),
[
pytest.param(
"beacon_custom_xorkey_file",
["-x", "0xCC"],
0,
b"SETTING_PUBKEY = '36aff0b273cb7aa704e4219ad3be78defcc8c1d7ecb779d55f438e82c7138673'",
None,
id="beacon_custom_xorkey_file-stdin-0xcc",
),
pytest.param(
"beacon_x86_file",
[],
0,
b"SETTING_PUBKEY = '71fab2149cbdce552f00e6d75372494d3f7755d366fd6849a6d5c9e0f73bc40f'",
None,
id="beacon_x86_file-stdin-normal",
),
pytest.param(
"beacon_x86_file",
["-t", "c2profile"],
0,
b'prepend "wordpress_ed1f617bbd6c004cc09e046f3c1b7148="',
None,
id="beacon_x86_file-stdin-c2profile",
),
pytest.param(
"beacon_x86_file",
["-t", "raw"],
0,
b"<Setting index=<BeaconSetting.SETTING_WATERMARK: 37>, type=<SettingsType.TYPE_INT: 2>",
None,
id="beacon_x86_file-stdin-raw",
),
pytest.param(
"beacon_x86_file",
["-t", "dumpstruct"],
0,
b"BeaconSetting.SETTING_PUBKEY",
None,
id="beacon_x86_file-stdin-dumpstruct",
),
pytest.param(
"beacon_x64_path",
["-t", "normal"],
0,
b"SETTING_PUBKEY = ",
None,
id="beacon_x64_path-normal",
),
pytest.param(
"beacon_x64_config_block",
[],
0,
b"SETTING_PUBKEY = ",
None,
id="beacon_x64_config_block-stdin-unobfuscated",
),
pytest.param(
b"NO BACON",
[],
1,
None,
b"No beacon configuration found",
id="invalid-beacon-stdin",
),
],
)
def test_main(capfdbinary, request, fixture, options, ret, stdout, stderr):
if isinstance(fixture, bytes):
beacon_file = io.BytesIO(fixture)
else:
beacon_file = request.getfixturevalue(fixture)

if isinstance(beacon_file, bytes):
beacon_file = io.BytesIO(beacon_file)

stdin = subprocess.PIPE if hasattr(beacon_file, "read") else None
proc = subprocess.Popen(
[
sys.executable,
"-m",
"dissect.cobaltstrike.beacon",
*options,
"-" if stdin else beacon_file,
],
stdin=stdin,
)
if stdin:
while True:
data = beacon_file.read(1024)
if not data:
break
proc.stdin.write(data)
proc.stdin.flush()
proc.stdin.close()
proc.wait()
assert proc.returncode == ret
cap = capfdbinary.readouterr()
if stdout:
assert stdout in cap.out
if stderr:
assert stderr in cap.err

0 comments on commit 4d8f0ff

Please sign in to comment.