Skip to content

Commit

Permalink
Code review: 313870043: Storage changes to be more resilient on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
Onager committed Oct 22, 2016
1 parent 5deb73c commit c007236
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 28 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ plaso (1.5.1-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <[email protected]> Sat, 22 Oct 2016 11:53:09 +0200
-- Log2Timeline <[email protected]> Sat, 22 Oct 2016 17:33:06 +0200
20 changes: 17 additions & 3 deletions plaso/multi_processing/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,25 @@ def _MergeTaskStorage(self, storage_writer):
self._merge_task_on_hold = self._merge_task
self._storage_merge_reader_on_hold = self._storage_merge_reader

self._storage_merge_reader = storage_writer.StartMergeTaskStorage(task)
self._merge_task = task
try:
self._storage_merge_reader = storage_writer.StartMergeTaskStorage(
task)
except IOError as exception:
logging.error(
(u'Unable to merge results of task: {0:s} '
u'with error: {1:s}').format(
task.identifier, exception))
self._storage_merge_reader = None

fully_merged = self._storage_merge_reader.MergeAttributeContainers(
maximum_number_of_containers=self._MAXIMUM_NUMBER_OF_CONTAINERS)
if self._storage_merge_reader:
fully_merged = self._storage_merge_reader.MergeAttributeContainers(
maximum_number_of_containers=self._MAXIMUM_NUMBER_OF_CONTAINERS)
else:
# TODO: Do something more sensible when this happens, perhaps
# retrying the task once that is implemented. For now, we mark the task
# as fully merged because we can't continue with it.
fully_merged = True

if self._processing_profiler:
self._processing_profiler.StopTiming(u'merge')
Expand Down
49 changes: 37 additions & 12 deletions plaso/storage/gzip_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import gzip
import os
import time

from plaso.lib import definitions
from plaso.lib import platform_specific
Expand All @@ -20,7 +21,7 @@ class GZIPStorageFile(interface.BaseFileStorage):

_COMPRESSION_LEVEL = 9

_DATA_BUFFER_SIZE = 16 * 1024 * 1024
_DATA_BUFFER_SIZE = 1 * 1024 * 1024

def __init__(self, storage_type=definitions.STORAGE_TYPE_TASK):
"""Initializes a storage.
Expand Down Expand Up @@ -68,15 +69,17 @@ def _OpenRead(self):
# of memory.
data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE)
while data_buffer:
while b'\n' in data_buffer:
line, _, data_buffer = data_buffer.partition(b'\n')
attribute_container = self._DeserializeAttributeContainer(
line, u'attribute_container')

self._AddAttributeContainer(attribute_container)

additional_data_buffer = self._gzip_file.read(self._DATA_BUFFER_SIZE)
data_buffer = b''.join([data_buffer, additional_data_buffer])
lines = data_buffer.splitlines(True)
data_buffer = b''
for index, line in enumerate(lines):
if line.endswith(b'\n'):
attribute_container = self._DeserializeAttributeContainer(
line, u'attribute_container')
self._AddAttributeContainer(attribute_container)
else:
data_buffer = b''.join(lines[index:])
data_buffer = data_buffer + self._gzip_file.read(
self._DATA_BUFFER_SIZE)

def _WriteAttributeContainer(self, attribute_container):
"""Writes an attribute container.
Expand Down Expand Up @@ -292,17 +295,30 @@ class GZIPStorageMergeReader(interface.StorageMergeReader):
"""Class that implements a gzip-based storage file reader for merging."""

_DATA_BUFFER_SIZE = 1 * 1024 * 1024
_MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS = 4
_LOCKED_FILE_SLEEP_TIME = 0.5

def __init__(self, storage_writer, path):
"""Initializes a storage merge reader.
Args:
storage_writer (StorageWriter): storage writer.
path (str): path to the input file.
Raises:
IOError: if the input file cannot be opened.
"""
super(GZIPStorageMergeReader, self).__init__(storage_writer)
self._data_buffer = None
self._gzip_file = gzip.open(path, 'rb')
# On Windows the file can sometimes be in use and we have to wait.
for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS):
try:
self._gzip_file = gzip.open(path, 'rb')
break
except IOError:
if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS:
raise

if platform_specific.PlatformIsWindows():
file_handle = self._gzip_file.fileno()
platform_specific.DisableWindowsFileHandleInheritance(file_handle)
Expand Down Expand Up @@ -344,6 +360,7 @@ def MergeAttributeContainers(self, maximum_number_of_containers=0):
bool: True if the entire task storage file has been merged.
Raises:
OSError: if the task storage file cannot be deleted.
RuntimeError: if the attribute container type is not supported.
"""
if not self._data_buffer:
Expand Down Expand Up @@ -393,7 +410,15 @@ def MergeAttributeContainers(self, maximum_number_of_containers=0):
self._gzip_file.close()
self._gzip_file = None

os.remove(self._path)
# On Windows the file can sometimes be in use and we have to wait.
for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS):
try:
os.remove(self._path)
break
except OSError:
if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS:
raise
time.sleep(self._LOCKED_FILE_SLEEP_TIME)

return True

Expand Down
24 changes: 12 additions & 12 deletions plaso/storage/zip_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ class ZIPStorageFile(interface.BaseFileStorage):
# The maximum serialized report size (32 MiB).
_MAXIMUM_SERIALIZED_REPORT_SIZE = 32 * 1024 * 1024

_MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES = 5
_MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS = 5
_LOCKED_FILE_SLEEP_TIME = 0.5

def __init__(
Expand Down Expand Up @@ -1741,18 +1741,15 @@ def _OpenZIPFile(self, path, read_only):
zipfile_path = os.path.join(directory_name, basename)

if os.path.exists(path):
attempts = 1
# On Windows the file can sometimes be in use and we have to wait.
while attempts <= self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES:
for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS):
try:
os.rename(path, zipfile_path)
break

except OSError:
if attempts == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES:
if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS:
raise
time.sleep(self._LOCKED_FILE_SLEEP_TIME)
attempts += 1

try:
self._zipfile = zipfile.ZipFile(
Expand Down Expand Up @@ -2466,26 +2463,28 @@ def Close(self):
self._zipfile = None
self._is_open = False

attempts = 1
file_renamed = False
if self._path != self._zipfile_path and os.path.exists(self._zipfile_path):
# On Windows the file can sometimes be still in use and we have to wait.
while attempts <= self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES:
for attempt in range(1, self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS):
try:
os.rename(self._zipfile_path, self._path)
file_renamed = True
break

except OSError:
if attempt == self._MAXIMUM_NUMBER_OF_LOCKED_FILE_ATTEMPTS:
raise
time.sleep(self._LOCKED_FILE_SLEEP_TIME)
attempts += 1

if attempts <= self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES:
if file_renamed:
directory_name = os.path.dirname(self._zipfile_path)
os.rmdir(directory_name)

self._path = None
self._zipfile_path = None

if attempts > self._MAXIMUM_NUMBER_OF_LOCKED_FILE_RETRIES:
if self._path != self._zipfile_path and not file_renamed:
raise IOError(u'Unable to close storage file.')

def Flush(self):
Expand Down Expand Up @@ -3273,7 +3272,8 @@ def StartMergeTaskStorage(self, task):
StorageMergeReader: storage merge reader of the task storage.
Raises:
IOError: if the storage type is not supported or
IOError: if the storage file cannot be opened or
if the storage type is not supported or
if the temporary path for the task storage does not exist or
if the temporary path for the task storage doe not refers to
a file.
Expand Down

0 comments on commit c007236

Please sign in to comment.