Skip to content

Commit

Permalink
Changes to make timeliner support date-less log formats log2timeline#…
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Mar 29, 2024
1 parent 1dbec7a commit 2bbf16c
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 77 deletions.
10 changes: 10 additions & 0 deletions plaso/containers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class DateLessLogHelper(interface.AttributeContainer):
Attributes:
earliest_date (list[int, int, int]): earliest possible date the event data
stream was created. The date is a tuple of year, month and day of month.
granularity (str): granularity of the date-less log format.
last_relative_date (list[int, int, int]): last relative date determined by
the date-less log helper. The date is a tuple of year, month and day of
month.
Expand All @@ -100,17 +101,25 @@ class DateLessLogHelper(interface.AttributeContainer):
SCHEMA = {
'_event_data_stream_identifier': 'AttributeContainerIdentifier',
'earliest_date': 'List[int]',
'granularity': 'str',
'last_relative_date': 'List[int]',
'latest_date': 'List[int]'}

_SERIALIZABLE_PROTECTED_ATTRIBUTES = [
'_event_data_stream_identifier']

# The date-less log format only supports time.
GRANULARITY_NO_DATE = 'd'

# The date-less log format only supports month and day of month.
GRANULARITY_NO_YEARS = 'y'

def __init__(self):
"""Initializes a date-less log helper attribute container."""
super(DateLessLogHelper, self).__init__()
self._event_data_stream_identifier = None
self.earliest_date = None
self.granularity = self.GRANULARITY_NO_YEARS
self.last_relative_date = None
self.latest_date = None

Expand All @@ -123,6 +132,7 @@ def CopyFromYearLessLogHelper(self, year_less_log_helper):
year_less_log_helper (YearLessLogHelper): year-less log helper.
"""
self.earliest_date = (year_less_log_helper.earliest_year, 1, 1)
self.granularity = self.GRANULARITY_NO_YEARS
self.last_relative_date = (year_less_log_helper.last_relative_year, 0, 0)
self.latest_date = (year_less_log_helper.latest_year, 1, 1)

Expand Down
106 changes: 67 additions & 39 deletions plaso/engine/timeliner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def __init__(
"""
super(EventDataTimeliner, self).__init__()
self._attribute_mappings = {}
self._base_years = {}
self._current_year = self._GetCurrentYear()
self._base_dates = {}
self._current_date = self._GetCurrentDate()
self._data_location = data_location
self._place_holder_event = set()
self._preferred_time_zone = None
Expand Down Expand Up @@ -75,92 +75,119 @@ def _CreateTimeZonePerPathSpec(self, system_configurations):
self._time_zone_per_path_spec[path_spec.parent] = (
system_configuration.time_zone)

def _GetBaseYear(self, storage_writer, event_data):
"""Retrieves the base year.
def _GetBaseDate(self, storage_writer, event_data):
"""Retrieves the base date.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
Returns:
int: base year.
tuple[int, int, int]: base date, as a tuple of year, month, day of month.
"""
# If preferred year is set considered it a user override, otherwise try
# to determine the year based on the date-less log helper or fallback to
# the current year.

if self._preferred_year:
return self._preferred_year
return (self._preferred_year, 1, 1)

event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
if not event_data_stream_identifier:
return self._current_year
return self._current_date

lookup_key = event_data_stream_identifier.CopyToString()

base_year = self._base_years.get(lookup_key, None)
if base_year:
return base_year
base_date = self._base_dates.get(lookup_key, None)
if base_date:
return base_date

filter_expression = f'_event_data_stream_identifier == "{lookup_key:s}"'
date_less_log_helpers = list(storage_writer.GetAttributeContainers(
events.DateLessLogHelper.CONTAINER_TYPE,
filter_expression=filter_expression))
if not date_less_log_helpers:
message = (
f'missing date-less log helper, defaulting to current year: '
f'{self._current_year:d}')
f'missing date-less log helper, defaulting to current date: '
f'{self._current_date[0]:d}-{self._current_date[1]:d}-'
f'{self._current_date[2]:d}')
self._ProduceTimeliningWarning(storage_writer, event_data, message)

base_year = self._current_year
base_date = self._current_date

else:
earliest_date = date_less_log_helpers[0].earliest_date
last_relative_date = date_less_log_helpers[0].last_relative_date
latest_date = date_less_log_helpers[0].latest_date
date_less_log_helper = date_less_log_helpers[0]

earliest_date = date_less_log_helper.earliest_date
last_relative_date = date_less_log_helper.last_relative_date
latest_date = date_less_log_helper.latest_date
current_date = self._current_date

if date_less_log_helper.granularity == (
date_less_log_helper.GRANULARITY_NO_YEARS):
if earliest_date:
earliest_date = (earliest_date[0], 0, 0)
if last_relative_date:
last_relative_date = (last_relative_date[0], 0, 0)
if latest_date:
latest_date = (latest_date[0], 0, 0)

current_date = (current_date[0], 0, 0)

if earliest_date is None and latest_date is None:
message = (
f'missing earliest and latest year in date-less log helper, '
f'defaulting to current year: {self._current_year:d}')
f'missing earliest and latest date in date-less log helper, '
f'defaulting to current date: {current_date[0]:d}-'
f'{current_date[1]:d}-{current_date[2]:d}')
self._ProduceTimeliningWarning(storage_writer, event_data, message)

base_year = self._current_year
base_date = current_date

elif earliest_date[0] + last_relative_date[0] < self._current_year:
base_year = earliest_date[0]
elif earliest_date[0] + last_relative_date[0] < current_date[0]:
base_date = (earliest_date[0], 1, 1)

elif latest_date[0] < self._current_year:
elif latest_date[0] < current_date[0]:
message = (
f'earliest year: {earliest_date[0]:d} as base year would exceed '
f'current year: {self._current_year:d} + '
f'{last_relative_date[0]:d}, using latest year: {latest_date[0]:d}')
f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-'
f'{earliest_date[2]:d} as base date would exceed current date: '
f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d} + '
f'{last_relative_date[0]:d}-{last_relative_date[1]:d}-'
f'{last_relative_date[2]:d}, using latest date: {latest_date[0]:d}-'
f'{latest_date[1]:d}-{latest_date[2]:d}')
self._ProduceTimeliningWarning(storage_writer, event_data, message)

base_year = latest_date[0] - last_relative_date[0]
base_date = tuple(map(
lambda latest, last_relative: latest - last_relative,
latest_date, last_relative_date))

else:
message = (
f'earliest year: {earliest_date[0]:d} and latest: year: '
f'{latest_date[0]:d} as base year would exceed current year: '
f'{self._current_year:d} + {last_relative_date[0]:d}, using '
f'current year')
f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-'
f'{earliest_date[2]:d} and latest: date: {latest_date[0]:d}-'
f'{latest_date[1]:d}-{latest_date[2]:d} as base date would exceed '
f'current date: {current_date[0]:d}-{current_date[1]:d}-'
f'{current_date[2]:d} + {last_relative_date[0]:d}-'
f'{last_relative_date[1]:d}-{last_relative_date[2]:d}, using '
f'current date')
self._ProduceTimeliningWarning(storage_writer, event_data, message)

base_year = self._current_year - last_relative_date[0]
base_date = tuple(map(
lambda current, last_relative: current - last_relative,
current_date, last_relative_date))

self._base_years[lookup_key] = base_year
self._base_dates[lookup_key] = base_date

return base_year
return base_date

def _GetCurrentYear(self):
"""Retrieves current year.
def _GetCurrentDate(self):
"""Retrieves current date.
Returns:
int: the current year.
tuple[int, int, int]: current date, as a tuple of year, month, day of
month.
"""
datetime_object = datetime.datetime.now(pytz.UTC)
return datetime_object.year
return datetime_object.year, datetime_object.month, datetime_object.day

def _GetEvent(
self, storage_writer, event_data, event_data_stream, date_time,
Expand All @@ -180,10 +207,11 @@ def _GetEvent(
"""
timestamp = None
if date_time.is_delta:
base_year = self._GetBaseYear(storage_writer, event_data)
base_date = self._GetBaseDate(storage_writer, event_data)

try:
date_time = date_time.NewFromDeltaAndYear(base_year)
# TODO: change dfDateTime to support NewFromDeltaAndDate.
date_time = date_time.NewFromDeltaAndYear(base_date[0])
except ValueError as exception:
self._ProduceTimeliningWarning(
storage_writer, event_data, str(exception))
Expand Down
50 changes: 42 additions & 8 deletions tests/containers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,38 @@
from tests import test_lib as shared_test_lib


class DateLessLogHelperTest(shared_test_lib.BaseTestCase):
"""Tests for the date-less log helper attribute container."""

def testGetAttributeNames(self):
"""Tests the GetAttributeNames function."""
attribute_container = events.DateLessLogHelper()

expected_attribute_names = [
'_event_data_stream_identifier',
'earliest_date',
'granularity',
'last_relative_date',
'latest_date']

attribute_names = sorted(attribute_container.GetAttributeNames())

self.assertEqual(attribute_names, expected_attribute_names)

def testGetEventDataStreamIdentifier(self):
"""Tests the GetEventDataStreamIdentifier function."""
attribute_container = events.DateLessLogHelper()

identifier = attribute_container.GetEventDataStreamIdentifier()
self.assertIsNone(identifier)

def testSetEventDataStreamIdentifier(self):
"""Tests the SetEventDataStreamIdentifier function."""
attribute_container = events.DateLessLogHelper()

attribute_container.SetEventDataStreamIdentifier(None)


class EventValuesHelperTest(shared_test_lib.BaseTestCase):
"""Tests for the event values helper functions."""

Expand Down Expand Up @@ -153,33 +185,35 @@ def testSetEventIdentifier(self):
attribute_container.SetEventIdentifier(None)


class DateLessLogHelperTest(shared_test_lib.BaseTestCase):
"""Tests for the date-less log helper attribute container."""
# TODO: the YearLessLogHelper attribute container is kept for backwards
# compatibility remove once storage format 20230327 is obsolete.
class YearLessLogHelperTest(shared_test_lib.BaseTestCase):
"""Tests for the year-less log helper attribute container."""

def testGetAttributeNames(self):
"""Tests the GetAttributeNames function."""
attribute_container = events.DateLessLogHelper()
attribute_container = events.YearLessLogHelper()

expected_attribute_names = [
'_event_data_stream_identifier',
'earliest_date',
'last_relative_date',
'latest_date']
'earliest_year',
'last_relative_year',
'latest_year']

attribute_names = sorted(attribute_container.GetAttributeNames())

self.assertEqual(attribute_names, expected_attribute_names)

def testGetEventDataStreamIdentifier(self):
"""Tests the GetEventDataStreamIdentifier function."""
attribute_container = events.DateLessLogHelper()
attribute_container = events.YearLessLogHelper()

identifier = attribute_container.GetEventDataStreamIdentifier()
self.assertIsNone(identifier)

def testSetEventDataStreamIdentifier(self):
"""Tests the SetEventDataStreamIdentifier function."""
attribute_container = events.DateLessLogHelper()
attribute_container = events.YearLessLogHelper()

attribute_container.SetEventDataStreamIdentifier(None)

Expand Down
Loading

0 comments on commit 2bbf16c

Please sign in to comment.