Skip to content

Commit

Permalink
Add the option to acquire private openssh keys
Browse files Browse the repository at this point in the history
Additionally, it adds the options to filter out
certain paths if the filter function evaluates to `True`

(DIS-1869)
  • Loading branch information
Miauwkeru committed Jul 19, 2023
1 parent a78ba1d commit d23023e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 22 deletions.
39 changes: 18 additions & 21 deletions acquire/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,11 @@ class Boot(Module):
]


def private_key_filter(path: fsutil.TargetPath) -> bool:
with path.open("rt") as file:
return "PRIVATE KEY" in file.readline()

Check warning on line 1359 in acquire/acquire.py

View check run for this annotation

Codecov / codecov/patch

acquire/acquire.py#L1358-L1359

Added lines #L1358 - L1359 were not covered by tests


@register_module("--home")
class Home(Module):
SPEC = [
Expand All @@ -1371,33 +1376,25 @@ class Home(Module):


@register_module("--ssh")
@module_arg("--private-keys", action="store_true", help="Add any private keys", default=False)
class SSH(Module):
@classmethod
def _run(cls, target: Target, collector):
user_pattern = ".ssh/*"

# Gather user paths
# TODO: Use from_user_home if supported for osx
if target._os.os == "osx":
iterator = [f"/Users/*/{user_pattern}"]
else:
iterator = list(from_user_home(target, user_pattern))
SPEC = [
("glob", ".ssh/*", from_user_home),
("glob", "/etc/ssh/*"),
("glob", "sysvol/ProgramData/ssh/*"),
]

@classmethod
def run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector):
# Acquire SSH configuration in sshd directories
iterator += ["/etc/ssh/*", "sysvol/ProgramData/ssh/*"]

globbed_path = (path for pattern in iterator for path in target.fs.glob(pattern))
for path in globbed_path:
if target.fs.path(path).is_dir():
collector.collect_dir(path)
continue
filter = None if cli_args.private_keys else private_key_filter

Check warning on line 1391 in acquire/acquire.py

View check run for this annotation

Codecov / codecov/patch

acquire/acquire.py#L1391

Added line #L1391 was not covered by tests

with target.fs.path(path).open("rt") as file:
if "PRIVATE KEY" in file.readline():
# Detected a private key, skipping.
continue
if filter:
log.info("Executing SSH without --private-keys, skipping private keys.")

Check warning on line 1394 in acquire/acquire.py

View check run for this annotation

Codecov / codecov/patch

acquire/acquire.py#L1393-L1394

Added lines #L1393 - L1394 were not covered by tests

collector.collect_file(path, outpath=path)
with collector.file_filter(filter):
super().run(target, cli_args, collector)

Check warning on line 1397 in acquire/acquire.py

View check run for this annotation

Codecov / codecov/patch

acquire/acquire.py#L1396-L1397

Added lines #L1396 - L1397 were not covered by tests


@register_module("--var")
Expand Down
25 changes: 24 additions & 1 deletion acquire/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@
from dataclasses import dataclass
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable, Optional, Sequence, Type, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterable,
Optional,
Sequence,
Type,
Union,
)

from dissect.target import Target
from dissect.target.exceptions import (
Expand Down Expand Up @@ -185,6 +194,7 @@ def __init__(self, target: Target, output: Output, base: str = "fs", skip_list:

self.report = CollectionReport()
self.bound_module_name = None
self.filter = lambda _: False

self.output.init(self.target)

Expand All @@ -202,6 +212,15 @@ def bind_module(self, module: Type) -> Collector:
finally:
self.unbind()

Check warning on line 213 in acquire/collector.py

View check run for this annotation

Codecov / codecov/patch

acquire/collector.py#L213

Added line #L213 was not covered by tests

@contextmanager
def file_filter(self, filter: Optional[Callable[[fsutil.TargetPath], bool]]) -> Collector:
try:
if filter:
self.filter = filter
yield self

Check warning on line 220 in acquire/collector.py

View check run for this annotation

Codecov / codecov/patch

acquire/collector.py#L217-L220

Added lines #L217 - L220 were not covered by tests
finally:
self.filter = lambda _: False

Check warning on line 222 in acquire/collector.py

View check run for this annotation

Codecov / codecov/patch

acquire/collector.py#L222

Added line #L222 was not covered by tests

def bind(self, module: Type) -> None:
self.bound_module_name = module.__name__

Expand Down Expand Up @@ -272,6 +291,10 @@ def collect_file(
if not isinstance(path, fsutil.TargetPath):
path = self.target.fs.path(path)

if self.filter(path) is True:
log.info("- Collecting file %s: Skipped (filtered out)", path)
return

Check warning on line 296 in acquire/collector.py

View check run for this annotation

Codecov / codecov/patch

acquire/collector.py#L295-L296

Added lines #L295 - L296 were not covered by tests

if self.report.was_path_seen(path):
log.info("- Collecting file %s: Skipped (DEDUP)", path)
return
Expand Down

0 comments on commit d23023e

Please sign in to comment.