Skip to content

Commit

Permalink
Yield MFT segments in specified range (#672)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zawadidone authored Aug 7, 2024
1 parent fabbdf9 commit 22d1ca8
Showing 1 changed file with 74 additions and 52 deletions.
126 changes: 74 additions & 52 deletions dissect/target/plugins/filesystem/ntfs/mft.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
from typing import Callable, Iterator

Expand All @@ -8,6 +10,7 @@
from flow.record.fieldtypes import windows_path

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.filesystems.ntfs import NtfsFilesystem
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, arg, export
from dissect.target.plugins.filesystem.ntfs.utils import (
Expand Down Expand Up @@ -119,9 +122,12 @@


class MftPlugin(Plugin):
def __init__(self, target):
super().__init__(target)
self.ntfs_filesystems = {index: fs for index, fs in enumerate(self.target.filesystems) if fs.__type__ == "ntfs"}

def check_compatible(self) -> None:
ntfs_filesystems = [fs for fs in self.target.filesystems if fs.__type__ == "ntfs"]
if not len(ntfs_filesystems):
if not len(self.ntfs_filesystems):
raise UnsupportedPluginError("No NTFS filesystems found")

@export(
Expand All @@ -133,12 +139,17 @@ def check_compatible(self) -> None:
]
)
@arg("--compact", action="store_true", help="compacts the MFT entry timestamps into a single record")
@arg("--fs", type=int, default=None, help="optional filesystem index, zero indexed")
@arg("--start", type=int, default=0, help="the first MFT segment number")
@arg("--end", type=int, default=-1, help="the last MFT segment number")
@arg(
"--macb",
action="store_true",
help="compacts the MFT entry timestamps into aggregated records with MACB bitfield",
)
def mft(self, compact: bool = False, macb: bool = False):
def mft(
self, compact: bool = False, fs: int | None = None, start: int = 0, end: int = -1, macb: bool = False
) -> Iterator[Record]:
"""Return the MFT records of all NTFS filesystems.
The Master File Table (MFT) contains primarily metadata about every file and folder on a NFTS filesystem.
Expand Down Expand Up @@ -167,55 +178,66 @@ def noaggr(records: list[Record]) -> Iterator[Record]:
elif macb:
aggr = macb_aggr

for fs in self.target.filesystems:
if fs.__type__ != "ntfs":
continue

# If this filesystem is a "fake" NTFS filesystem, used to enhance a
# VirtualFilesystem, The driveletter (more accurate mount point)
# returned will be that of the VirtualFilesystem. This makes sure
# the paths returned in the records are actually reachable.
drive_letter = get_drive_letter(self.target, fs)
volume_uuid = get_volume_identifier(fs)

if fs is not None:
try:
for record in fs.ntfs.mft.segments():
try:
inuse = bool(record.header.Flags & FILE_RECORD_SEGMENT_IN_USE)
owner, _ = get_owner_and_group(record, fs)
resident = False
size = None

if not record.is_dir():
for data_attribute in record.attributes.DATA:
if data_attribute.name == "":
resident = data_attribute.resident
break

size = get_record_size(record)

for path in record.full_paths():
path = f"{drive_letter}{path}"
yield from aggr(
self.mft_records(
drive_letter=drive_letter,
record=record,
segment=record.segment,
path=path,
owner=owner,
size=size,
resident=resident,
inuse=inuse,
volume_uuid=volume_uuid,
record_formatter=record_formatter,
)
filesystem = self.ntfs_filesystems[fs]
except KeyError:
self.target.log.error("NTFS filesystem with index number %s does not exist", fs)
return

yield from self.segments(filesystem, record_formatter, aggr, start, end)
else:
for filesystem in self.ntfs_filesystems.values():
yield from self.segments(filesystem, record_formatter, aggr, start, end)

def segments(
self, fs: NtfsFilesystem, record_formatter: Callable, aggr: Callable, start: int, end: int
) -> Iterator[Record]:
# If this filesystem is a "fake" NTFS filesystem, used to enhance a
# VirtualFilesystem, The driveletter (more accurate mount point)
# returned will be that of the VirtualFilesystem. This makes sure
# the paths returned in the records are actually reachable.
drive_letter = get_drive_letter(self.target, fs)
volume_uuid = get_volume_identifier(fs)

try:
for record in fs.ntfs.mft.segments(start, end):
try:
inuse = bool(record.header.Flags & FILE_RECORD_SEGMENT_IN_USE)
owner, _ = get_owner_and_group(record, fs)
resident = None
size = None

if not record.is_dir():
for data_attribute in record.attributes.DATA:
if data_attribute.name == "":
resident = data_attribute.resident
break

size = get_record_size(record)

for path in record.full_paths():
path = f"{drive_letter}{path}"
yield from aggr(
self.mft_records(
drive_letter=drive_letter,
record=record,
segment=record.segment,
path=path,
owner=owner,
size=size,
resident=resident,
inuse=inuse,
volume_uuid=volume_uuid,
record_formatter=record_formatter,
)
except Exception as e:
self.target.log.warning("An error occured parsing MFT segment %d: %s", record.segment, str(e))
self.target.log.debug("", exc_info=e)
)
except Exception as e:
self.target.log.warning("An error occured parsing MFT segment %d: %s", record.segment, str(e))
self.target.log.debug("", exc_info=e)

except Exception:
log.exception("An error occured constructing FilesystemRecords")
except Exception:
log.exception("An error occured constructing FilesystemRecords")

def mft_records(
self,
Expand All @@ -229,7 +251,7 @@ def mft_records(
inuse: bool,
volume_uuid: str,
record_formatter: Callable,
):
) -> Iterator[Record]:
for attr in record.attributes.STANDARD_INFORMATION:
yield from record_formatter(
attr=attr,
Expand Down Expand Up @@ -286,7 +308,7 @@ def mft_records(
)


def compacted_formatter(attr: Attribute, record_type: InformationType, **kwargs):
def compacted_formatter(attr: Attribute, record_type: InformationType, **kwargs) -> Iterator[Record]:
record_desc = COMPACT_RECORD_TYPES.get(record_type)
yield record_desc(
creation_time=attr.creation_time,
Expand All @@ -297,7 +319,7 @@ def compacted_formatter(attr: Attribute, record_type: InformationType, **kwargs)
)


def formatter(attr: Attribute, record_type: InformationType, **kwargs):
def formatter(attr: Attribute, record_type: InformationType, **kwargs) -> Iterator[Record]:
record_desc = RECORD_TYPES.get(record_type)
for type, timestamp in [
("B", attr.creation_time),
Expand Down

0 comments on commit 22d1ca8

Please sign in to comment.