From c0072363410831da80ba90b83fa1b4f53e83a269 Mon Sep 17 00:00:00 2001 From: Daniel White Date: Sat, 22 Oct 2016 17:33:08 +0200 Subject: [PATCH] Code review: 313870043: Storage changes to be more resilient on Windows #1009 --- config/dpkg/changelog | 2 +- plaso/multi_processing/task_engine.py | 20 +++++++++-- plaso/storage/gzip_file.py | 49 ++++++++++++++++++++------- plaso/storage/zip_file.py | 24 ++++++------- 4 files changed, 67 insertions(+), 28 deletions(-) diff --git a/config/dpkg/changelog b/config/dpkg/changelog index 4165baaa6a..b119f7e22d 100644 --- a/config/dpkg/changelog +++ b/config/dpkg/changelog @@ -2,4 +2,4 @@ plaso (1.5.1-1) unstable; urgency=low * Auto-generated - -- Log2Timeline Sat, 22 Oct 2016 11:53:09 +0200 \ No newline at end of file + -- Log2Timeline Sat, 22 Oct 2016 17:33:06 +0200 \ No newline at end of file diff --git a/plaso/multi_processing/task_engine.py b/plaso/multi_processing/task_engine.py index 51040b7dd5..4ffd5e6c92 100644 --- a/plaso/multi_processing/task_engine.py +++ b/plaso/multi_processing/task_engine.py @@ -220,11 +220,25 @@ def _MergeTaskStorage(self, storage_writer): self._merge_task_on_hold = self._merge_task self._storage_merge_reader_on_hold = self._storage_merge_reader - self._storage_merge_reader = storage_writer.StartMergeTaskStorage(task) self._merge_task = task + try: + self._storage_merge_reader = storage_writer.StartMergeTaskStorage( + task) + except IOError as exception: + logging.error( + (u'Unable to merge results of task: {0:s} ' + u'with error: {1:s}').format( + task.identifier, exception)) + self._storage_merge_reader = None - fully_merged = self._storage_merge_reader.MergeAttributeContainers( - maximum_number_of_containers=self._MAXIMUM_NUMBER_OF_CONTAINERS) + if self._storage_merge_reader: + fully_merged = self._storage_merge_reader.MergeAttributeContainers( + maximum_number_of_containers=self._MAXIMUM_NUMBER_OF_CONTAINERS) + else: + # TODO: Do something more sensible when this happens, perhaps + # retrying the task once that is implemented. For now, we mark the task + # as fully merged because we can't continue with it. + fully_merged = True if self._processing_profiler: self._processing_profiler.StopTiming(u'merge') diff --git a/plaso/storage/gzip_file.py b/plaso/storage/gzip_file.py index cf173ad4e6..ae7ef5e079 100644 --- a/plaso/storage/gzip_file.py +++ b/plaso/storage/gzip_file.py @@ -6,6 +6,7 @@ import gzip import os +import time from plaso.lib import definitions from plaso.lib import platform_specific @@ -20,7 +21,7 @@ class GZIPStorageFile(interface.BaseFileStorage): _COMPRESSION_LEVEL = 9 - _DATA_BUFFER_SIZE = 16 * 1024 * 1024 + _DATA_BUFFER_SIZE = 1 * 1024 * 1024 def __init__(self, storage_type=definitions.STORAGE_TYPE_TASK): """Initializes a storage. @@ -68,15 +69,17 @@ def _OpenRead(self): # of memory. data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) while data_buffer: - while b'\n' in data_buffer: - line, _, data_buffer = data_buffer.partition(b'\n') - attribute_container = self._DeserializeAttributeContainer( - line, u'attribute_container') - - self._AddAttributeContainer(attribute_container) - - additional_data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE) - data_buffer = b''.join([data_buffer, additional_data_buffer]) + lines = data_buffer.splitlines(True) + data_buffer = b'' + for index, line in enumerate(lines): + if line.endswith(b'\n'): + attribute_container = self._DeserializeAttributeContainer( + line, u'attribute_container') + self._AddAttributeContainer(attribute_container) + else: + data_buffer = b''.join(lines[index:]) + data_buffer = data_buffer + self._gzip_file.read( + self._DATA_BUFFER_SIZE) def _WriteAttributeContainer(self, attribute_container): """Writes an attribute container. @@ -292,6 +295,8 @@ class GZIPStorageMergeReader(interface.StorageMergeReader): """Class that implements a gzip-based storage file reader for merging.""" _DATA_BUFFER_SIZE = 1 * 1024 * 1024 + _MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS = 4 + _LOCKED_FILE_SLEEP_TIME = 0.5 def __init__(self, storage_writer, path): """Initializes a storage merge reader. @@ -299,10 +304,21 @@ def __init__(self, storage_writer, path): Args: storage_writer (StorageWriter): storage writer. path (str): path to the input file. + + Raises: + IOError: if the input file cannot be opened. """ super(GZIPStorageMergeReader, self).__init__(storage_writer) self._data_buffer = None - self._gzip_file = gzip.open(path, 'rb') + # On Windows the file can sometimes be in use and we have to wait. + for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS): + try: + self._gzip_file = gzip.open(path, 'rb') + break + except IOError: + if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS: + raise + if platform_specific.PlatformIsWindows(): file_handle = self._gzip_file.fileno() platform_specific.DisableWindowsFileHandleInheritance(file_handle) @@ -344,6 +360,7 @@ def MergeAttributeContainers(self, maximum_number_of_containers=0): bool: True if the entire task storage file has been merged. Raises: + OSError: if the task storage file cannot be deleted. RuntimeError: if the attribute container type is not supported. """ if not self._data_buffer: @@ -393,7 +410,15 @@ def MergeAttributeContainers(self, maximum_number_of_containers=0): self._gzip_file.close() self._gzip_file = None - os.remove(self._path) + # On Windows the file can sometimes be in use and we have to wait. + for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS): + try: + os.remove(self._path) + break + except OSError: + if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS: + raise + time.sleep(self._LOCKED_FILE_SLEEP_TIME) return True diff --git a/plaso/storage/zip_file.py b/plaso/storage/zip_file.py index bfbe189060..739535c398 100644 --- a/plaso/storage/zip_file.py +++ b/plaso/storage/zip_file.py @@ -1009,7 +1009,7 @@ class ZIPStorageFile(interface.BaseFileStorage): # The maximum serialized report size (32 MiB). _MAXIMUM_SERIALIZED_REPORT_SIZE = 32 * 1024 * 1024 - _MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES = 5 + _MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS = 5 _LOCKED_FILE_SLEEP_TIME = 0.5 def __init__( @@ -1741,18 +1741,15 @@ def _OpenZIPFile(self, path, read_only): zipfile_path = os.path.join(directory_name, basename) if os.path.exists(path): - attempts = 1 - # On Windows the file can sometimes be in use and we have to wait. - while attempts <= self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES: + for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS): try: os.rename(path, zipfile_path) break except OSError: - if attempts == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES: + if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS: raise time.sleep(self._LOCKED_FILE_SLEEP_TIME) - attempts += 1 try: self._zipfile = zipfile.ZipFile( @@ -2466,26 +2463,28 @@ def Close(self): self._zipfile = None self._is_open = False - attempts = 1 + file_renamed = False if self._path != self._zipfile_path and os.path.exists(self._zipfile_path): # On Windows the file can sometimes be still in use and we have to wait. - while attempts <= self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES: + for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS): try: os.rename(self._zipfile_path, self._path) + file_renamed = True break except OSError: + if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS: + raise time.sleep(self._LOCKED_FILE_SLEEP_TIME) - attempts += 1 - if attempts <= self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES: + if file_renamed: directory_name = os.path.dirname(self._zipfile_path) os.rmdir(directory_name) self._path = None self._zipfile_path = None - if attempts > self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES: + if self._path != self._zipfile_path and not file_renamed: raise IOError(u'Unable to close storage file.') def Flush(self): @@ -3273,7 +3272,8 @@ def StartMergeTaskStorage(self, task): StorageMergeReader: storage merge reader of the task storage. Raises: - IOError: if the storage type is not supported or + IOError: if the storage file cannot be opened or + if the storage type is not supported or if the temporary path for the task storage does not exist or if the temporary path for the task storage doe not refers to a file.