From 22d1ca8bbb9750a6be4061b1bc578ff1bda35a81 Mon Sep 17 00:00:00 2001 From: Zawadi Done Date: Wed, 7 Aug 2024 19:12:22 +0200 Subject: [PATCH] Yield MFT segments in specified range (#672) --- dissect/target/plugins/filesystem/ntfs/mft.py | 126 ++++++++++-------- 1 file changed, 74 insertions(+), 52 deletions(-) diff --git a/dissect/target/plugins/filesystem/ntfs/mft.py b/dissect/target/plugins/filesystem/ntfs/mft.py index f62dd3ea5..e61a9c3ef 100644 --- a/dissect/target/plugins/filesystem/ntfs/mft.py +++ b/dissect/target/plugins/filesystem/ntfs/mft.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging from typing import Callable, Iterator @@ -8,6 +10,7 @@ from flow.record.fieldtypes import windows_path from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.filesystems.ntfs import NtfsFilesystem from dissect.target.helpers.record import TargetRecordDescriptor from dissect.target.plugin import Plugin, arg, export from dissect.target.plugins.filesystem.ntfs.utils import ( @@ -119,9 +122,12 @@ class MftPlugin(Plugin): + def __init__(self, target): + super().__init__(target) + self.ntfs_filesystems = {index: fs for index, fs in enumerate(self.target.filesystems) if fs.__type__ == "ntfs"} + def check_compatible(self) -> None: - ntfs_filesystems = [fs for fs in self.target.filesystems if fs.__type__ == "ntfs"] - if not len(ntfs_filesystems): + if not len(self.ntfs_filesystems): raise UnsupportedPluginError("No NTFS filesystems found") @export( @@ -133,12 +139,17 @@ def check_compatible(self) -> None: ] ) @arg("--compact", action="store_true", help="compacts the MFT entry timestamps into a single record") + @arg("--fs", type=int, default=None, help="optional filesystem index, zero indexed") + @arg("--start", type=int, default=0, help="the first MFT segment number") + @arg("--end", type=int, default=-1, help="the last MFT segment number") @arg( "--macb", action="store_true", help="compacts the MFT entry timestamps into aggregated records with MACB bitfield", ) - def mft(self, compact: bool = False, macb: bool = False): + def mft( + self, compact: bool = False, fs: int | None = None, start: int = 0, end: int = -1, macb: bool = False + ) -> Iterator[Record]: """Return the MFT records of all NTFS filesystems. The Master File Table (MFT) contains primarily metadata about every file and folder on a NFTS filesystem. @@ -167,55 +178,66 @@ def noaggr(records: list[Record]) -> Iterator[Record]: elif macb: aggr = macb_aggr - for fs in self.target.filesystems: - if fs.__type__ != "ntfs": - continue - - # If this filesystem is a "fake" NTFS filesystem, used to enhance a - # VirtualFilesystem, The driveletter (more accurate mount point) - # returned will be that of the VirtualFilesystem. This makes sure - # the paths returned in the records are actually reachable. - drive_letter = get_drive_letter(self.target, fs) - volume_uuid = get_volume_identifier(fs) - + if fs is not None: try: - for record in fs.ntfs.mft.segments(): - try: - inuse = bool(record.header.Flags & FILE_RECORD_SEGMENT_IN_USE) - owner, _ = get_owner_and_group(record, fs) - resident = False - size = None - - if not record.is_dir(): - for data_attribute in record.attributes.DATA: - if data_attribute.name == "": - resident = data_attribute.resident - break - - size = get_record_size(record) - - for path in record.full_paths(): - path = f"{drive_letter}{path}" - yield from aggr( - self.mft_records( - drive_letter=drive_letter, - record=record, - segment=record.segment, - path=path, - owner=owner, - size=size, - resident=resident, - inuse=inuse, - volume_uuid=volume_uuid, - record_formatter=record_formatter, - ) + filesystem = self.ntfs_filesystems[fs] + except KeyError: + self.target.log.error("NTFS filesystem with index number %s does not exist", fs) + return + + yield from self.segments(filesystem, record_formatter, aggr, start, end) + else: + for filesystem in self.ntfs_filesystems.values(): + yield from self.segments(filesystem, record_formatter, aggr, start, end) + + def segments( + self, fs: NtfsFilesystem, record_formatter: Callable, aggr: Callable, start: int, end: int + ) -> Iterator[Record]: + # If this filesystem is a "fake" NTFS filesystem, used to enhance a + # VirtualFilesystem, The driveletter (more accurate mount point) + # returned will be that of the VirtualFilesystem. This makes sure + # the paths returned in the records are actually reachable. + drive_letter = get_drive_letter(self.target, fs) + volume_uuid = get_volume_identifier(fs) + + try: + for record in fs.ntfs.mft.segments(start, end): + try: + inuse = bool(record.header.Flags & FILE_RECORD_SEGMENT_IN_USE) + owner, _ = get_owner_and_group(record, fs) + resident = None + size = None + + if not record.is_dir(): + for data_attribute in record.attributes.DATA: + if data_attribute.name == "": + resident = data_attribute.resident + break + + size = get_record_size(record) + + for path in record.full_paths(): + path = f"{drive_letter}{path}" + yield from aggr( + self.mft_records( + drive_letter=drive_letter, + record=record, + segment=record.segment, + path=path, + owner=owner, + size=size, + resident=resident, + inuse=inuse, + volume_uuid=volume_uuid, + record_formatter=record_formatter, ) - except Exception as e: - self.target.log.warning("An error occured parsing MFT segment %d: %s", record.segment, str(e)) - self.target.log.debug("", exc_info=e) + ) + except Exception as e: + self.target.log.warning("An error occured parsing MFT segment %d: %s", record.segment, str(e)) + self.target.log.debug("", exc_info=e) - except Exception: - log.exception("An error occured constructing FilesystemRecords") + except Exception: + log.exception("An error occured constructing FilesystemRecords") def mft_records( self, @@ -229,7 +251,7 @@ def mft_records( inuse: bool, volume_uuid: str, record_formatter: Callable, - ): + ) -> Iterator[Record]: for attr in record.attributes.STANDARD_INFORMATION: yield from record_formatter( attr=attr, @@ -286,7 +308,7 @@ def mft_records( ) -def compacted_formatter(attr: Attribute, record_type: InformationType, **kwargs): +def compacted_formatter(attr: Attribute, record_type: InformationType, **kwargs) -> Iterator[Record]: record_desc = COMPACT_RECORD_TYPES.get(record_type) yield record_desc( creation_time=attr.creation_time, @@ -297,7 +319,7 @@ def compacted_formatter(attr: Attribute, record_type: InformationType, **kwargs) ) -def formatter(attr: Attribute, record_type: InformationType, **kwargs): +def formatter(attr: Attribute, record_type: InformationType, **kwargs) -> Iterator[Record]: record_desc = RECORD_TYPES.get(record_type) for type, timestamp in [ ("B", attr.creation_time),