diff --git a/acquire/crypt.py b/acquire/crypt.py index 3575270..3e7ba7e 100644 --- a/acquire/crypt.py +++ b/acquire/crypt.py @@ -113,7 +113,7 @@ def tell(self): return self.fh.tell() def seek(self, pos, whence=io.SEEK_CUR): - raise TypeError("seeking is not allowed") + raise io.UnsupportedOperation("seeking is not allowed") def close(self): self.finalize() diff --git a/acquire/outputs/zip.py b/acquire/outputs/zip.py index f114abc..3b9740d 100644 --- a/acquire/outputs/zip.py +++ b/acquire/outputs/zip.py @@ -48,7 +48,7 @@ def __init__( if encrypt: self._fh = EncryptedStream(self.path.open("wb"), public_key) - self.archive = zipfile.ZipFile(fileobj=self._fh, mode="w", compression=self.compression, allowZip64=True) + self.archive = zipfile.ZipFile(self._fh, mode="w", compression=self.compression, allowZip64=True) else: self.archive = zipfile.ZipFile(self.path, mode="w", compression=self.compression, allowZip64=True) diff --git a/acquire/tools/decrypter.py b/acquire/tools/decrypter.py index efeb50d..bd3bcae 100644 --- a/acquire/tools/decrypter.py +++ b/acquire/tools/decrypter.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import argparse import base64 import contextlib @@ -14,6 +16,9 @@ from datetime import datetime, timezone from pathlib import Path from queue import Empty as QueueEmptyError +from queue import Queue +from threading import Event +from typing import BinaryIO, Iterator from urllib import request from urllib.error import HTTPError from urllib.parse import urljoin @@ -73,7 +78,7 @@ class VerifyError(Exception): class EncryptedFile(AlignedStream): - def __init__(self, fh, key_file=None, key_server=None): + def __init__(self, fh: BinaryIO, key_file: Path | None = None, key_server: str | None = None) -> None: self.fh = fh self.key_file = key_file self.key_server = key_server @@ -116,10 +121,10 @@ def __init__(self, fh, key_file=None, key_server=None): def seekable(self): return False - def seek(self, pos, whence=io.SEEK_CUR): + def seek(self, pos: int, whence: int = io.SEEK_CUR) -> int: raise io.UnsupportedOperation("seeking is not allowed") - def _read(self, offset, length): + def _read(self, offset: int, length: int) -> bytes: if not self.size: result = [] @@ -162,25 +167,25 @@ def _read(self, offset, length): read_size = max(0, min(length, self.size - offset)) return self.cipher.decrypt(self.fh.read(read_size)) - def chunks(self, chunk_size=CHUNK_SIZE): + def chunks(self, chunk_size: int = CHUNK_SIZE) -> Iterator[bytes]: while True: chunk = self.read(chunk_size) if not chunk: break yield chunk - def verify(self): + def verify(self) -> None: try: self.cipher.verify(self.digest) except ValueError: raise VerifyError("Digest check failed") @property - def file_header(self): + def file_header(self) -> c_acquire.file: return self._file_header @file_header.setter - def file_header(self, file_header): + def file_header(self, file_header: c_acquire.file) -> None: if file_header.magic != FILE_MAGIC: raise ValueError(f"Invalid file magic: {file_header.magic}") @@ -193,31 +198,31 @@ def file_header(self, file_header): self._file_header = file_header @property - def header(self): + def header(self) -> c_acquire.header: return self._header @header.setter - def header(self, header): + def header(self, header: c_acquire.header) -> None: if header.magic != HEADER_MAGIC: raise ValueError(f"Invalid header magic: {header.magic}") self._header = header @property - def footer(self): + def footer(self) -> c_acquire.footer: return self._footer @footer.setter - def footer(self, footer): + def footer(self, footer: c_acquire.footer) -> None: if footer.magic != FOOTER_MAGIC: raise ValueError(f"Invalid footer magic: {footer}") self._footer = footer @property - def timestamp(self): + def timestamp(self) -> datetime: return datetime.fromtimestamp(self.file_header.timestamp, timezone.utc) -def decrypt_header(header, fingerprint, key_file=None, key_server=None): +def decrypt_header(header, fingerprint: bytes, key_file: Path | None = None, key_server: str | None = None) -> bytes: if not key_file and not key_server: raise ValueError("Need either key file or key server") @@ -264,7 +269,16 @@ def check_existing(in_path: Path, out_path: Path, status_queue: multiprocessing. return False -def worker(task_id, stop_event, status_queue, in_path, out_path, key_file=None, key_server=None, clobber=False): +def worker( + task_id: int, + stop_event: Event, + status_queue: Queue, + in_path: Path, + out_path: Path, + key_file: Path | None = None, + key_server: str | None = None, + clobber: bool = False, +) -> None: success = False message = "An unknown error occurred" @@ -325,23 +339,23 @@ def worker(task_id, stop_event, status_queue, in_path, out_path, key_file=None, _exit(status_queue, task_id, str(in_path), message, success) -def _start(queue, task_id): +def _start(queue: Queue, task_id: int) -> None: queue.put_nowait((STATUS_START, task_id)) -def _update(queue, task_id, *args, **kwargs): +def _update(queue: Queue, task_id: int, *args, **kwargs) -> None: queue.put_nowait((STATUS_UPDATE, (task_id, args, kwargs))) -def _info(queue, msg): +def _info(queue: Queue, msg: str) -> None: queue.put_nowait((STATUS_INFO, msg)) -def _exit(queue: multiprocessing.Queue, task_id: int, in_path: str, message: str, success: bool): +def _exit(queue: multiprocessing.Queue, task_id: int, in_path: str, message: str, success: bool) -> None: queue.put_nowait((STATUS_EXIT, (task_id, in_path, message, success))) -def setup_logging(logger, verbosity): +def setup_logging(logger: logging.Logger, verbosity: int) -> None: if verbosity == 1: level = logging.ERROR elif verbosity == 2: @@ -360,7 +374,7 @@ def setup_logging(logger, verbosity): logger.setLevel(level) -def main(): +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("files", nargs="+", type=Path, help="paths to encrypted files") parser.add_argument("-o", "--output", type=Path, help="optional path to output file") @@ -476,12 +490,12 @@ def main(): # If no successful results, return 1 if not any(successes): exit_code = 1 - # Else, if some results were successful return 2 + # Else, if some results but not all were successful return 2 elif not all(successes): exit_code = 2 - # Else, if all were successful but there were still tasks to handle, return 2 - elif all(success) and tasks: - exit_code = 2 + # Else, if all were successful but there were still tasks to handle, return 3 + elif tasks: + exit_code = 3 exit(exit_code) diff --git a/tests/conftest.py b/tests/conftest.py index b9d11a8..7600d62 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -33,3 +33,9 @@ def mock_target(mock_fs: VirtualFilesystem) -> Target: target.filesystems.add(mock_fs) target.os = "mock" return target + + +@pytest.fixture +def public_key() -> bytes: + with open("tests/data/public_key.pem", "r") as f: + return f.read() diff --git a/tests/data/private_key.pem b/tests/data/private_key.pem new file mode 100644 index 0000000..7e45927 --- /dev/null +++ b/tests/data/private_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDDd7qXEe7cR4b0 +DFmszQm5MNFPkf/uJd5i6Y4ya0prts9s4mDdS90oDUt7bvLl42i/eB+S1fw9UpmK +r8OBV51VoFPswLVSwHho7LjbGvBSgKCW31skalgOmCAiCl6BqqjI7eIqxJx3V2+G +tW19v0EvbPJNs4gWlLIzn0WZJ65HWNZxToD9NcM2uZeIFxziUFvQYv1Nzkih/HWA +8Fw8Sdrq3JN3RG9jikJSH/MisU25IP5ehP5g66akPCufBCEc6Y2OE4wAQ7sbBMpf +/6PLZHGCVMnE8TXGBUl7m9UVwdyhvfIFBNhGilpqtSQhlqymrko7R7uZ6x5gQIz/ +YzUW/Ho9AgMBAAECggEABqDTvKBeDNgxalXU4KLfS96s6lmAEo+e1+nQPu4byu/w +XrfaeFvvNuwkfXNeBzpL6K+RGoXpFMdCmk0AKy2mZytATUyc7skaDCzNeUM+QlNG +9CFvfMT3vB71JVJcBrebxkcoHofQ6ncWOrzXkVEKoSoSRAeXe3SKtRdsi8H9teuX +uXzs8fyk+Xrp9qBE3y541HcZCh8oLypQgTFoV3cZJgcsrnRaLQUooU2n1lvl+EZx +xoZnL1LBMmX/teVICE20NJOlJN25Z+Q26tNM6ADMFmmN0hDaHUEv/tlV+MSB01Mq +nBvC1/q6pHODHW1AsLfxwT2f+VeEz4Hpbxpu1fCkXQKBgQDyhzJgVkkKctCcMlcB +4fck+mxvQlRuWHs94RdMd28bZay5BQhd/rqicDzqIQ2NIhaPDDbUR5ElqLXQEDZD +6s+FeHbT+iXTOtWA6qQJL0/alcEL22Nxxv098nFrjUBeinaw+PbOZq5DOT4NVL/Q +i0lGzQs+6jEH9aYc24Tu9Gi5EwKBgQDOU1KQhAea1rztLkAbEI0giKM/6vf0g6im +1/UlUy8TjyIaE4Cwgsy/H6LuvY1KOiV/6boO3jBl5OyZZBFqIEbmEd3MH75XC0XP +bLtI00EVHU6jCf/dLE5wNhxhEuAw0KB12ecR7fZv1Wg9ltj/IR6dFBJ+Q7uuxufk +yq9R9QU5bwKBgB/Qdl5G01wIha8Ht3wqvTXfl9vccqDrAHe0kE7al/ubEdZPf7J8 +2NS4LnV0EogCAb2QF50vKi4rfHYnukachc53Z/cUqGOWIy2/GfeOekYtQN6iT+A7 +/zpiFFjMdbYxKbK7ZfzbYV62IpqzFFpx+xHLkf8Vz4rAwaKldUG3VAl7AoGASlef +gk7wZoxFWri1hIr8LuLM37UMTuA5npRl0mMcrVF/miG41uDqYVtG2/sUs9ArvuE6 +lyzcB3rq/YIe/DxRD4kUf/5YGQkIyGqHOQBVjQQYV4q81LaoNKpqo1enzC8AAjbX +mZBCoZ0liDuYSKVoYHThDPne4GTvHXMipMdCcKUCgYEA5h0686KZxIHgxPx7A4Br +zHKigjFGda4C9xPWBdpjvLcbFgyc+ULzP61q+h2LnE+HEuFzVqMj56dlIJl73ooI +FyjJR9ceNDyZ37XzBF3IM5AaxdvPfB/OIjMOGS5yV+1hijcGdy+RgBGVADv4lgUd +soFdGXF8UUBPdYczZR2R7jQ= +-----END PRIVATE KEY----- diff --git a/tests/data/public_key.pem b/tests/data/public_key.pem new file mode 100644 index 0000000..5989f52 --- /dev/null +++ b/tests/data/public_key.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAw3e6lxHu3EeG9AxZrM0J +uTDRT5H/7iXeYumOMmtKa7bPbOJg3UvdKA1Le27y5eNov3gfktX8PVKZiq/DgVed +VaBT7MC1UsB4aOy42xrwUoCglt9bJGpYDpggIgpegaqoyO3iKsScd1dvhrVtfb9B +L2zyTbOIFpSyM59FmSeuR1jWcU6A/TXDNrmXiBcc4lBb0GL9Tc5Iofx1gPBcPEna +6tyTd0RvY4pCUh/zIrFNuSD+XoT+YOumpDwrnwQhHOmNjhOMAEO7GwTKX/+jy2Rx +glTJxPE1xgVJe5vVFcHcob3yBQTYRopaarUkIZaspq5KO0e7meseYECM/2M1Fvx6 +PQIDAQAB +-----END PUBLIC KEY----- diff --git a/tests/test_outputs_tar.py b/tests/test_outputs_tar.py index 0c6e36f..caf9b54 100644 --- a/tests/test_outputs_tar.py +++ b/tests/test_outputs_tar.py @@ -5,6 +5,7 @@ from dissect.target.filesystem import VirtualFilesystem from acquire.outputs import TarOutput +from acquire.tools.decrypter import EncryptedFile @pytest.fixture(params=[(True, "gzip"), (True, "bzip2"), (True, "xz"), (False, None)]) @@ -41,3 +42,20 @@ def test_tar_output_write_entry(mock_fs: VirtualFilesystem, tar_output: TarOutpu assert file.issym() elif entry.is_file(): assert file.isfile() + + +def test_tar_output_encrypt(mock_fs: VirtualFilesystem, public_key: bytes, tmp_path: Path) -> None: + entry_name = "/foo/bar/some-file" + entry = mock_fs.get(entry_name) + tar_output = TarOutput(tmp_path, compress=True, compression_method="gzip", encrypt=True, public_key=public_key) + tar_output.write_entry(entry_name, entry) + tar_output.close() + + encrypted_stream = EncryptedFile(tar_output.path.open("rb"), Path("tests/data/private_key.pem")) + decrypted_path = tmp_path / "decrypted.tar" + # Direct streaming is not an option because tarfile needs seek when reading from encrypted files directly + with open(decrypted_path, "wb") as f: + f.write(encrypted_stream.read()) + + tar_file = tarfile.open(name=decrypted_path, mode="r") + assert entry.open().read() == tar_file.extractfile(entry_name).read() diff --git a/tests/test_outputs_zip.py b/tests/test_outputs_zip.py index 65d4aea..6368e5b 100644 --- a/tests/test_outputs_zip.py +++ b/tests/test_outputs_zip.py @@ -6,6 +6,7 @@ from dissect.target.filesystem import VirtualFilesystem from acquire.outputs import ZipOutput +from acquire.tools.decrypter import EncryptedFile @pytest.fixture(params=[(True, "deflate"), (True, "bzip2"), (True, "lzma"), (False, None)]) @@ -45,3 +46,20 @@ def test_zip_output_write_entry(mock_fs: VirtualFilesystem, zip_output: ZipOutpu assert stat.S_ISLNK(file_type) elif entry.is_file(): assert stat.S_ISREG(file_type) + + +def test_zip_output_encrypt(mock_fs: VirtualFilesystem, public_key: bytes, tmp_path: Path) -> None: + entry_name = "/foo/bar/some-file" + entry = mock_fs.get(entry_name) + zip_output = ZipOutput(tmp_path, compress=True, compression_method="bzip2", encrypt=True, public_key=public_key) + zip_output.write_entry(entry_name, entry) + zip_output.close() + + encrypted_stream = EncryptedFile(zip_output.path.open("rb"), Path("tests/data/private_key.pem")) + decrypted_path = tmp_path / "decrypted.zip" + # Direct streaming is not an option because zipfile needs seek when reading from encrypted files directly + with open(decrypted_path, "wb") as f: + f.write(encrypted_stream.read()) + + zip_file = zipfile.ZipFile(decrypted_path, mode="r") + assert entry.open().read() == zip_file.open(entry_name).read()