Skip to content

Commit

Permalink
Make acquire compression configurable (#185)
Browse files Browse the repository at this point in the history
(DIS-3291)
  • Loading branch information
Horofic authored Jul 19, 2024
1 parent c84dfdf commit d7dd277
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 32 deletions.
1 change: 1 addition & 0 deletions acquire/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,7 @@ def acquire_target(target: Target, args: argparse.Namespace, output_ts: Optional
output = OUTPUTS[args.output_type](
output_path,
compress=args.compress,
compression_method=args.compress_method,
encrypt=args.encrypt,
public_key=args.public_key,
)
Expand Down
6 changes: 4 additions & 2 deletions acquire/outputs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from acquire.outputs.dir import DirectoryOutput
from acquire.outputs.tar import TarOutput
from acquire.outputs.zip import ZipOutput
from acquire.outputs.tar import TAR_COMPRESSION_METHODS, TarOutput
from acquire.outputs.zip import ZIP_COMPRESSION_METHODS, ZipOutput

__all__ = ["DirectoryOutput", "TarOutput", "ZipOutput"]

OUTPUTS = {"tar": TarOutput, "dir": DirectoryOutput, "zip": ZipOutput}

COMPRESSION_METHODS = {*TAR_COMPRESSION_METHODS, *ZIP_COMPRESSION_METHODS}
11 changes: 9 additions & 2 deletions acquire/outputs/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
from acquire.crypt import EncryptedStream
from acquire.outputs.base import Output

TAR_COMPRESSION_METHODS = {"gzip": "gz", "bzip2": "bz2", "xz": "xz"}


class TarOutput(Output):
"""Tar archive acquire output format. Output can be compressed and/or encrypted.
Args:
path: The path to write the tar archive to.
compress: Whether to compress the tar archive.
compression_method: Compression method to use (Default: gzip). Supports "gzip", "bzip2", "xz".
encrypt: Whether to encrypt the tar archive.
public_key: The RSA public key to encrypt the header with.
"""
Expand All @@ -23,15 +26,19 @@ def __init__(
self,
path: Path,
compress: bool = False,
compression_method: str = "gzip",
encrypt: bool = False,
public_key: Optional[bytes] = None,
) -> None:
self.compression = None
ext = ".tar" if ".tar" not in path.suffixes else ""
mode = "w|" if encrypt else "w:"

if compress:
ext += ".gz" if ".gz" not in path.suffixes else ""
mode += "gz"
self.compression = TAR_COMPRESSION_METHODS.get(compression_method, "gz")

ext += f".{self.compression}" if f".{self.compression}" not in path.suffixes else ""
mode += self.compression

if encrypt:
ext += ".enc"
Expand Down
57 changes: 38 additions & 19 deletions acquire/outputs/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
from acquire.crypt import EncryptedStream
from acquire.outputs.base import Output

ZIP_COMPRESSION_METHODS = {"deflate": zipfile.ZIP_DEFLATED, "bzip2": zipfile.ZIP_BZIP2, "lzma": zipfile.ZIP_LZMA}


class ZipOutput(Output):
"""Zip archive acquire output format. Output can be compressed and/or encrypted.
Args:
path: The path to write the zip archive to.
compress: Whether to compress the zip archive.
compression_method: Compression method to use (Default: Deflate). Supports "deflate", "bzip2", "lzma".
encrypt: Whether to encrypt the zip archive.
public_key: The RSA public key to encrypt the header with.
"""
Expand All @@ -26,6 +29,7 @@ def __init__(
self,
path: Path,
compress: bool = False,
compression_method: str = "deflate",
encrypt: bool = False,
public_key: Optional[bytes] = None,
) -> None:
Expand All @@ -38,7 +42,7 @@ def __init__(
self.path = path.with_suffix(path.suffix + ext)

if compress:
self.compression = zipfile.ZIP_DEFLATED
self.compression = ZIP_COMPRESSION_METHODS.get(compression_method, zipfile.ZIP_DEFLATED)
else:
self.compression = zipfile.ZIP_STORED

Expand Down Expand Up @@ -78,32 +82,19 @@ def write(
info.compress_type = self.compression

if entry:
info.external_attr = self._get_external_attr(entry)

if entry.is_symlink():
# System which created ZIP archive, 3 = Unix; 0 = Windows
# Windows does not have symlinks, so this must be a unixoid system
info.create_system = 3

# The Python zipfile module accepts the 16-bit "Mode" field (that stores st_mode field from
# struct stat, containing user/group/other permissions, setuid/setgid and symlink info, etc) of the
# ASi extra block for Unix as bits 16-31 of the external_attr
unix_st_mode = (
stat.S_IFLNK
| stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IWOTH
| stat.S_IXOTH
)
info.external_attr = unix_st_mode << 16

lstat = entry.lstat()
if lstat:
# Python zipfile module does not support timestamps before 1980
dt = datetime.fromtimestamp(lstat.st_mtime)
info.date_time = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
year = max(dt.year, 1980)
info.date_time = (year, dt.month, dt.day, dt.hour, dt.minute, dt.second)

with self.archive.open(info, "w") as zfh:
shutil.copyfileobj(fh, zfh)
Expand All @@ -113,3 +104,31 @@ def close(self) -> None:
self.archive.close()
if self._fh:
self._fh.close()

def _get_external_attr(self, entry: FilesystemEntry) -> int:
"""Return the appropriate external attributes of the entry."""

# The Python zipfile module accepts the 16-bit "Mode" field (that stores st_mode field from
# struct stat, containing user/group/other permissions, setuid/setgid and symlink info, etc) of the
# ASi extra block for Unix as bits 16-31 of the external_attr
unix_st_mode = stat.S_IFREG

if entry.is_symlink():
unix_st_mode = stat.S_IFLNK
elif entry.is_dir():
unix_st_mode = stat.S_IFDIR

unix_st_mode = (
unix_st_mode
| stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IWOTH
| stat.S_IXOTH
) << 16

return unix_st_mode
24 changes: 22 additions & 2 deletions acquire/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

from dissect.target import Target

from acquire.outputs import OUTPUTS
from acquire.outputs import (
COMPRESSION_METHODS,
OUTPUTS,
TAR_COMPRESSION_METHODS,
ZIP_COMPRESSION_METHODS,
)
from acquire.uploaders.plugin_registry import UploaderRegistry


Expand Down Expand Up @@ -75,7 +80,7 @@ def create_argument_parser(profiles: dict, volatile: dict, modules: dict) -> arg
parser.add_argument(
"-ot",
"--output-type",
choices=OUTPUTS.keys(),
choices=OUTPUTS,
default="tar",
help="output type (default: tar)",
)
Expand All @@ -84,6 +89,11 @@ def create_argument_parser(profiles: dict, volatile: dict, modules: dict) -> arg
action=argparse.BooleanOptionalAction,
help="compress output (if supported by the output type)",
)
parser.add_argument(
"--compress-method",
choices=COMPRESSION_METHODS,
help="compression method (if supported by the output type)",
)
parser.add_argument(
"--encrypt",
action=argparse.BooleanOptionalAction,
Expand Down Expand Up @@ -320,6 +330,16 @@ def check_and_set_acquire_args(
if not args.children and args.skip_parent:
raise ValueError("--skip-parent can only be set with --children")

if args.compress:
if (args.output_type == "zip" and args.compress_method) and args.compress_method not in ZIP_COMPRESSION_METHODS:
raise ValueError(
f"Invalid compression method for zip, allowed are: {', '.join(ZIP_COMPRESSION_METHODS.keys())}"
)
if (args.output_type == "tar" and args.compress_method) and args.compress_method not in TAR_COMPRESSION_METHODS:
raise ValueError(
f"Invalid compression method for tar, allowed are: {', '.join(TAR_COMPRESSION_METHODS.keys())}"
)


def get_user_name() -> str:
try:
Expand Down
8 changes: 5 additions & 3 deletions tests/test_outputs_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from acquire.outputs import TarOutput


@pytest.fixture
def tar_output(tmp_path: Path) -> TarOutput:
return TarOutput(tmp_path)
@pytest.fixture(params=[(True, "gzip"), (True, "bzip2"), (True, "xz"), (False, None)])
def tar_output(tmp_path: Path, request: pytest.FixtureRequest) -> TarOutput:
compress, compression_method = request.param
return TarOutput(tmp_path, compress=compress, compression_method=compression_method)


@pytest.mark.parametrize(
Expand All @@ -28,6 +29,7 @@ def test_tar_output_write_entry(mock_fs: VirtualFilesystem, tar_output: TarOutpu
tar_file = tarfile.open(tar_output.path)
files = tar_file.getmembers()

assert tar_output.path.suffix == f".{tar_output.compression}" if tar_output.compression else ".tar"
assert len(files) == 1

file = files[0]
Expand Down
47 changes: 47 additions & 0 deletions tests/test_outputs_zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import stat
import zipfile
from pathlib import Path

import pytest
from dissect.target.filesystem import VirtualFilesystem

from acquire.outputs import ZipOutput


@pytest.fixture(params=[(True, "deflate"), (True, "bzip2"), (True, "lzma"), (False, None)])
def zip_output(tmp_path: Path, request: pytest.FixtureRequest) -> ZipOutput:
compress, compression_method = request.param
return ZipOutput(tmp_path, compress=compress, compression_method=compression_method)


@pytest.mark.parametrize(
"entry_name",
[
"/foo/bar/some-file",
"/foo/bar/some-symlink",
"/foo/bar/some-dir",
],
)
def test_zip_output_write_entry(mock_fs: VirtualFilesystem, zip_output: ZipOutput, entry_name: str) -> None:
entry = mock_fs.get(entry_name)

assert zip_output.compression == zip_output.archive.compression
zip_output.write_entry(entry_name, entry)
zip_output.close()

zip_file = zipfile.ZipFile(zip_output.path, mode="r")
files = zip_file.filelist
assert len(files) == 1

file = files[0]
assert file.filename == entry_name

file_type = file.external_attr >> 16

# zipfile only supports is_dir(). we have all the information we need to determine the file type in 'external_attr'
if entry.is_dir():
assert stat.S_ISDIR(file_type)
elif entry.is_symlink():
assert stat.S_ISLNK(file_type)
elif entry.is_file():
assert stat.S_ISREG(file_type)
9 changes: 5 additions & 4 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,11 @@ def test_utils_normalize_path(
if os == "windows":
case_sensitive = False

with patch.object(mock_target, "os", new=os), patch.object(
mock_target.fs, "_case_sensitive", new=case_sensitive
), patch.object(mock_target.fs, "_alt_separator", new=("\\" if os == "windows" else "/")), patch.dict(
mock_target.props, {"sysvol_drive": sysvol}
with (
patch.object(mock_target, "os", new=os),
patch.object(mock_target.fs, "_case_sensitive", new=case_sensitive),
patch.object(mock_target.fs, "_alt_separator", new=("\\" if os == "windows" else "/")),
patch.dict(mock_target.props, {"sysvol_drive": sysvol}),
):
if as_path:
path = TargetPath(mock_target.fs, path)
Expand Down

0 comments on commit d7dd277

Please sign in to comment.