Skip to content

Commit

Permalink
Pass CLI arguments to _run and get_spec_additions
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Aug 15, 2023
1 parent 17d7501 commit 7e8d22c
Showing 1 changed file with 40 additions and 43 deletions.
83 changes: 40 additions & 43 deletions acquire/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Iterator, Optional, Union

from dissect.target import Target, exceptions
from dissect.target.filesystem import Filesystem
from dissect.target.filesystems import dir, ntfs
from dissect.target.helpers import fsutil
from dissect.target.loaders.remote import RemoteStreamConnection
Expand Down Expand Up @@ -141,7 +142,7 @@ def misc_osx_user_homes(target: Target) -> Iterator[fsutil.TargetPath]:
}


def from_user_home(target: Target, path: str):
def from_user_home(target: Target, path: str) -> Iterator[str]:
# Until getting the users from osx is implemented in dissect.target
if target.os == "osx":
for misc_dir in misc_osx_user_homes(target):
Expand All @@ -155,7 +156,7 @@ def from_user_home(target: Target, path: str):
yield str(user_dir.joinpath(path))


def iter_ntfs_filesystems(target):
def iter_ntfs_filesystems(target: Target) -> Iterator[tuple[ntfs.NtfsFilesystem, str, str]]:
mount_lookup = defaultdict(list)
for mount, fs in target.fs.mounts.items():
mount_lookup[fs].append(mount)
Expand All @@ -181,13 +182,13 @@ def iter_ntfs_filesystems(target):
yield fs, name, mountpoints


def mount_all_ntfs_filesystems(target):
def mount_all_ntfs_filesystems(target: Target) -> None:
for fs, name, _ in iter_ntfs_filesystems(target):
if name not in target.fs.mounts:
target.fs.mount(name, fs)


def iter_esxi_filesystems(target):
def iter_esxi_filesystems(target: Target) -> Iterator[tuple[str, str, Filesystem]]:
for mount, fs in target.fs.mounts.items():
if not mount.startswith("/vmfs/volumes/"):
continue
Expand Down Expand Up @@ -255,25 +256,25 @@ class Module:
EXEC_ORDER = ExecutionOrder.DEFAULT

@classmethod
def run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector):
def run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
desc = cls.DESC or cls.__name__.lower()
log.info("*** Acquiring %s", desc)

with collector.bind_module(cls):
collector.collect(cls.SPEC)

spec_ext = cls.get_spec_additions(target)
spec_ext = cls.get_spec_additions(target, cli_args)

Check warning on line 266 in acquire/acquire.py

View check run for this annotation

Codecov / codecov/patch

acquire/acquire.py#L266

Added line #L266 was not covered by tests
if spec_ext:
collector.collect(list(spec_ext))

cls._run(target, collector)
cls._run(target, cli_args, collector)

Check warning on line 270 in acquire/acquire.py

View check run for this annotation

Codecov / codecov/patch

acquire/acquire.py#L270

Added line #L270 was not covered by tests

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
pass

@classmethod
def _run(cls, target, collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
pass


Expand All @@ -284,7 +285,7 @@ class Sys(Module):
EXEC_ORDER = ExecutionOrder.BOTTOM

@classmethod
def _run(cls, target: Target, collector: Collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
if not Path("/sys").exists():
log.error("/sys is unavailable! Skipping...")
return
Expand All @@ -306,7 +307,7 @@ class Proc(Module):
EXEC_ORDER = ExecutionOrder.BOTTOM

@classmethod
def _run(cls, target: Target, collector: Collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
if not Path("/proc").exists():
log.error("/proc is unavailable! Skipping...")
return
Expand All @@ -325,7 +326,7 @@ class NTFS(Module):
DESC = "NTFS filesystem metadata"

@classmethod
def _run(cls, target, collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
for fs, name, mountpoints in iter_ntfs_filesystems(target):
log.info("Acquiring %s (%s)", fs, mountpoints)

Expand All @@ -336,7 +337,7 @@ def _run(cls, target, collector):
cls.collect_ntfs_secure(collector, fs, name)

@classmethod
def collect_usnjrnl(cls, collector: Collector, fs, name: str) -> None:
def collect_usnjrnl(cls, collector: Collector, fs: Filesystem, name: str) -> None:
try:
usnjrnl_path = fs.path("$Extend/$Usnjrnl:$J")
entry = usnjrnl_path.get()
Expand Down Expand Up @@ -366,7 +367,7 @@ def collect_usnjrnl(cls, collector: Collector, fs, name: str) -> None:
log.info("- Collecting file $Extend/$Usnjrnl:$J: %s", result)

@classmethod
def collect_ntfs_secure(cls, collector: Collector, fs, name: str) -> None:
def collect_ntfs_secure(cls, collector: Collector, fs: Filesystem, name: str) -> None:
try:
secure_path = fs.path("$Secure:$SDS")
entry = secure_path.get()
Expand Down Expand Up @@ -404,7 +405,7 @@ class Registry(Module):
]

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
# Glob all hives to include e.g. .LOG files and .regtrans-ms files.
files = []
for hive in cls.HIVES:
Expand Down Expand Up @@ -458,7 +459,7 @@ class WinArpCache(Module):
EXEC_ORDER = ExecutionOrder.BOTTOM

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
if float(target.ntversion) < 6.2:
commands = [
# < Windows 10
Expand All @@ -479,7 +480,7 @@ class WinRDPSessions(Module):
EXEC_ORDER = ExecutionOrder.BOTTOM

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
# where.exe instead of where, just in case the client runs in PS instead of CMD
# by default where hides qwinsta on 32-bit systems because qwinsta is only 64-bit, but with recursive /R search
# we can still manage to find it and by passing the exact path Windows will launch a 64-bit process
Expand All @@ -499,7 +500,7 @@ class WinMemDump(Module):
EXEC_ORDER = ExecutionOrder.BOTTOM

@classmethod
def _run(cls, target, collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
winpmem_file_name = "winpmem.exe"
winpmem_exec = shutil.which(winpmem_file_name)

Expand Down Expand Up @@ -566,7 +567,7 @@ class EventLogs(Module):
DESC = "event logs"

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
spec = set()
evt_log_paths = evt.EvtPlugin(target).get_logs(filename_glob="*.evt")
for path in evt_log_paths:
Expand Down Expand Up @@ -595,7 +596,7 @@ class NTDS(Module):
]

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
spec = set()
key = "HKLM\\SYSTEM\\CurrentControlSet\\services\\NTDS\\Parameters"
values = [
Expand Down Expand Up @@ -637,7 +638,7 @@ class RecycleBin(Module):
DESC = "recycle bin metadata"

@classmethod
def _run(cls, target, collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
for fs, name, mountpoints in iter_ntfs_filesystems(target):
log.info("Acquiring recycle bin metadata from %s (%s)", fs, mountpoints)

Expand All @@ -660,7 +661,7 @@ class Exchange(Module):
DESC = "interesting Exchange configuration files"

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
spec = set()

key = "HKLM\\SOFTWARE\\Microsoft\\ExchangeServer"
Expand Down Expand Up @@ -699,7 +700,7 @@ class IIS(Module):
DESC = "IIS logs"

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
spec = set(
[
("glob", "sysvol\\Windows\\System32\\LogFiles\\W3SVC*\\*.log"),
Expand Down Expand Up @@ -789,7 +790,7 @@ class DHCP(Module):
DESC = "Windows Server DHCP files"

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
spec = set()
key = "HKLM\\SYSTEM\\CurrentControlSet\\Services\\DhcpServer\\Parameters"
for reg_key in target.registry.iterkeys(key):
Expand Down Expand Up @@ -1332,7 +1333,7 @@ class WER(Module):
DESC = "WER (Windows Error Reporting) related files"

@classmethod
def get_spec_additions(cls, target):
def get_spec_additions(cls, target: Target, cli_args: argparse.Namespace) -> Iterator[tuple]:
spec = set()

for wer_dir in itertools.chain(
Expand Down Expand Up @@ -1403,7 +1404,7 @@ class SSH(Module):
]

@classmethod
def run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector):
def run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
# Acquire SSH configuration in sshd directories

filter = None if cli_args.private_keys else private_key_filter
Expand Down Expand Up @@ -1481,7 +1482,7 @@ class Bootbanks(Module):
DESC = "ESXi bootbanks"

@classmethod
def _run(cls, target, collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
# Both ESXi 6 and 7 compatible
boot_dirs = {
"boot": "BOOT",
Expand Down Expand Up @@ -1524,7 +1525,7 @@ class VMFS(Module):
DESC = "ESXi VMFS metadata files"

@classmethod
def _run(cls, target, collector):
def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
for uuid, name, fs in iter_esxi_filesystems(target):
if not fs.__fstype__ == "vmfs":
continue
Expand Down Expand Up @@ -1586,7 +1587,7 @@ class FileHashes(Module):
)

@classmethod
def run(cls, target, cli_args, collector):
def run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
log.info("*** Acquiring file hashes")

specs = cls.get_specs(cli_args)
Expand All @@ -1604,7 +1605,7 @@ def run(cls, target, cli_args, collector):
log.info("Hashing is done, %s files processed in %.2f secs", rows_count, (time.time() - start))

@classmethod
def get_specs(cls, cli_args):
def get_specs(cls, cli_args: argparse.Namespace) -> Iterator[tuple]:
path_selectors = []

if cli_args.ext_to_hash:
Expand Down Expand Up @@ -1644,7 +1645,7 @@ class OpenHandles(Module):
DESC = "Open handles"

@classmethod
def run(cls, target: Target, cli_args: dict[str, any], collector: Collector):
def run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None:
if not sys.platform == "win32":
log.error("Open Handles plugin can only run on Windows systems! Skipping...")
return
Expand All @@ -1667,7 +1668,7 @@ def run(cls, target: Target, cli_args: dict[str, any], collector: Collector):
log.info("Collecting open handles is done.")


def print_disks_overview(target):
def print_disks_overview(target: Target) -> None:
log.info("// Disks")
try:
for disk in target.disks:
Expand All @@ -1682,7 +1683,7 @@ def print_disks_overview(target):
log.info("")


def print_volumes_overview(target):
def print_volumes_overview(target: Target) -> None:
log.info("// Volumes")
try:
for volume in target.volumes:
Expand Down Expand Up @@ -1714,15 +1715,15 @@ def modargs2json(args: argparse.Namespace) -> dict:
return json_opts


def acquire_target(target: Target, *args, **kwargs):
def acquire_target(target: Target, *args, **kwargs) -> list[str]:
if isinstance(target._loader, TargetdLoader):
files = acquire_target_targetd(target, *args, **kwargs)
else:
files = acquire_target_regular(target, *args, **kwargs)
return files


def acquire_target_targetd(target: Target, args: argparse.Namespace, output_ts: Optional[str] = None):
def acquire_target_targetd(target: Target, args: argparse.Namespace, output_ts: Optional[str] = None) -> list[str]:
files = []
if not len(target.hostname()):
log.error("Unable to initialize targetd.")
Expand All @@ -1742,7 +1743,7 @@ def acquire_target_targetd(target: Target, args: argparse.Namespace, output_ts:
return files


def acquire_target_regular(target: Target, args: argparse.Namespace, output_ts: Optional[str] = None):
def acquire_target_regular(target: Target, args: argparse.Namespace, output_ts: Optional[str] = None) -> list[str]:
files = []
output_ts = output_ts or get_utc_now_str()
if args.log_to_dir:
Expand Down Expand Up @@ -1939,11 +1940,7 @@ def acquire_target_regular(target: Target, args: argparse.Namespace, output_ts:
return files


def upload_files(
paths: list[Path],
upload_plugin: UploaderPlugin,
no_proxy: bool = False,
):
def upload_files(paths: list[Path], upload_plugin: UploaderPlugin, no_proxy: bool = False) -> None:
proxies = None if no_proxy else urllib.request.getproxies()
log.debug("Proxies: %s (no_proxy = %s)", proxies, no_proxy)

Expand Down Expand Up @@ -2111,7 +2108,7 @@ def upload_files(
}


def main():
def main() -> None:
parser = create_argument_parser(PROFILES, MODULES)
args = parse_acquire_args(parser, config=CONFIG)

Expand Down Expand Up @@ -2202,7 +2199,7 @@ def load_child(target: Target, child_path: Path) -> None:
return child


def acquire_children_and_targets(target: Target, args: argparse.Namespace):
def acquire_children_and_targets(target: Target, args: argparse.Namespace) -> None:
if args.child:
target = load_child(target, args.child)

Expand Down

0 comments on commit 7e8d22c

Please sign in to comment.