Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multireaderfilestream Redesign #4595

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 56 additions & 28 deletions codalab/lib/beam/MultiReaderFileStream.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
from io import BytesIO
from io import BytesIO, SEEK_SET, SEEK_END
from threading import Lock

from codalab.worker.un_gzip_stream import BytesBuffer


import time
class MultiReaderFileStream(BytesIO):
"""
FileStream that support multiple readers
FileStream that support multiple readers and seeks backwards
"""
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
NUM_READERS = 2
LOOKBACK_LENGTH = 33554432
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
MAX_THRESHOLD = LOOKBACK_LENGTH * 2

def __init__(self, fileobj):
self._bufs = [BytesBuffer() for _ in range(0, self.NUM_READERS)]
self._pos = [0 for _ in range(0, self.NUM_READERS)]
self._buffer = bytes()
self._buffer_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH)
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
self._size = 0 # size of bytes (for convenience)
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
# self._pos = MinMaxHeap() # position of each reader
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader in the fileobj
self._fileobj = fileobj
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers.

self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer.
class FileStreamReader(BytesIO):
def __init__(s, index):
s._index = index
Expand All @@ -25,33 +26,60 @@ def read(s, num_bytes=None):

def peek(s, num_bytes):
return self.peek(s._index, num_bytes)

def seek(s, offset, whence=SEEK_SET):
return self.seek(s._index, offset, whence)

self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)]

def _fill_buf_bytes(self, index: int, num_bytes=None):
with self._lock:
while num_bytes is None or len(self._bufs[index]) < num_bytes:
s = self._fileobj.read(num_bytes)
if not s:
break
for i in range(0, self.NUM_READERS):
self._bufs[i].write(s)

def read(self, index: int, num_bytes=None): # type: ignore
def _fill_buf_bytes(self, num_bytes=None):
# with self._lock:
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
s = self._fileobj.read(num_bytes)
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
if not s:
return
self._buffer += s
self._size += len(s)

def read(self, index: int, num_bytes=0): # type: ignore
"""Read the specified number of bytes from the associated file.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 0? If optional, use None, and add type hint Optional[int]

index: index that specifies which reader is reading.
"""
self._fill_buf_bytes(index, num_bytes)
if num_bytes is None:
num_bytes = len(self._bufs[index])
s = self._bufs[index].read(num_bytes)
self._pos[index] += len(s)
# Calculate how many new bytes need to be read
with self._lock:
new_bytes_needed = num_bytes - (max(self._pos) - self._pos[index])
if new_bytes_needed > 0:
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
self._fill_buf_bytes(new_bytes_needed)
while (self._pos[index] + num_bytes) - self._buffer_pos > self.MAX_THRESHOLD:
time.sleep(10) # 100 ms

dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
with self._lock:
old_position = self._pos[index] - self._buffer_pos
s = self._buffer[old_position:old_position + num_bytes]

# Modify position
self._pos[index] += len(s)

dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
# Update buffer if this reader is the minimum reader
diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_pos # calculated min position of buffer minus current min position of buffer
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
# NOTE: it's possible for diff < 0 if seek backwards occur
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
if diff > 0:
self._buffer = self._buffer[diff:]
self._buffer_pos += diff
self._size -= diff
return s

def peek(self, index: int, num_bytes): # type: ignore
self._fill_buf_bytes(index, num_bytes)
s = self._bufs[index].peek(num_bytes)
return s
pass
# self._fill_buf_bytes(index, num_bytes)
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
# s = self._bufs[index].peek(num_bytes)
# return s

def seek(self, index: int, offset: int, whence=SEEK_SET):
if whence == SEEK_END:
super().seek(offset, whence)
else:
assert offset >= self._buffer_pos
self._pos[index] = offset

def close(self):
self.__input.close()
2 changes: 1 addition & 1 deletion codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def write_fileobj(
conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '')
os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str
try:
CHUNK_SIZE = 16 * 1024
CHUNK_SIZE = 1024 * 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why increase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upload speed with the smaller chunk size was too slow due to the sleep behavior that occurs on the faster reader, which is always the index reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there's no super meaningful reason to keep chunk size smallish since the speed tradeoff is too large

dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved

def upload_file_content():
iteration = 0
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ websockets==9.1
kubernetes==12.0.1
google-cloud-storage==2.0.0
httpio==0.3.0
memory_profiler==0.61.0
142 changes: 142 additions & 0 deletions tests/unit/beam/multireaderfilestream_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import tempfile
import time
import unittest

from threading import Thread

from codalab.lib.beam.MultiReaderFileStream import MultiReaderFileStream

FILESIZE = 100000000
CHUNKSIZE = FILESIZE/10

class MultiReaderFileStreamTest(unittest.TestCase):
def test_reader_distance(self):
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file
for _ in range(4):
status = reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)

t1.start()

# Sleep a little for thread 1 to start reading
time.sleep(3)
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved

# Assert that the first reader has not read past the Maximum threshold
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
self.assertGreater(70000000, m_stream._pos[0])

t2.start()

# Sleep a little for thread 2 to start reading
time.sleep(3)

# Assert that the first reader is at 100000000, second reader is at 40000000
self.assertEqual(100000000, m_stream._pos[0])
self.assertEqual(40000000, m_stream._pos[1])

# Assert that the buffer is at 6445568 (40000000 - LOOKBACK_LENGTH)
self.assertEqual(6445568, m_stream._buffer_pos)
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved

# Assert that the buffer is length 100000000 - 6445568
self.assertEqual(93554432, m_stream._size)

t1.join()
t2.join()

def test_seek(self):
with tempfile.NamedTemporaryFile(delete=True) as f:
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to 10000000 and read another 4/10 of the file
for _ in range(4):
reader_2.read(CHUNKSIZE)

try:
reader_2.seek(10000000)
except AssertionError as e:
result = e

for _ in range(4):
reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsNone(result)

# Check that reader 2 is at 50000000 and buffer position is correct
self.assertEqual(50000000, m_stream._pos[1])
self.assertEqual(16445568, m_stream._buffer_pos)


def test_toofar_seek(self):
with tempfile.NamedTemporaryFile(delete=True) as f:
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to the beginning
for _ in range(4):
status = reader_2.read(CHUNKSIZE)

try:
reader_2.seek(0)
except AssertionError as e:
result = e

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsInstance(result, AssertionError)
57 changes: 55 additions & 2 deletions tests/unit/lib/upload_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from io import BytesIO
from memory_profiler import memory_usage
from typing import IO, cast
from unittest.mock import MagicMock
from urllib.response import addinfourl
Expand All @@ -18,7 +19,8 @@
from tests.unit.server.bundle_manager import TestBase

urlopen_real = urllib.request.urlopen

LARGE_FILE_SIZE = 16777216 #16MB
EXTRA_LARGE_FILE_SIZE = 134217728 #128MB for Memory Profiling Only

class UploadManagerTestBase(TestBase):
"""A class that contains the base for an UploadManager test. Subclasses
Expand All @@ -42,6 +44,13 @@ def check_file_equals_string(self, file_subpath: str, expected_contents: str):
def listdir(self):
"""List the files in the current bundle location."""
raise NotImplementedError

def check_file_size(self):
"""Check the file sizes in the current bundle location"""
with FileSystems.open(
self.bundle_location, compression_type=CompressionTypes.UNCOMPRESSED
) as f, tarfile.open(fileobj=f, mode='r:gz') as tf:
return [tarinfo.size for tarinfo in tf.getmembers()]

@property
def bundle_location(self):
Expand Down Expand Up @@ -78,6 +87,37 @@ def test_fileobj_gz(self):
self.do_upload(('source.gz', BytesIO(gzip_bytestring(b'testing'))))
self.check_file_equals_string('', 'testing')

def test_fileobj_tar_gz(self):
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
self.write_file_of_size(10, os.path.join(source, 'file'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['file'], sorted(self.listdir()))
self.assertEqual([0, 10], self.check_file_size())

def test_large_fileobj_tar_gz(self):
"""
Large bundles should not cause issues
"""
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile'))
self.write_string_to_file('testing', os.path.join(source, 'README'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['README', 'bigfile'], sorted(self.listdir()))

def test_large_fileobj_tar_gz2(self):
"""
Large bundles should not cause issues
"""
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile'))
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(source, 'bigfile2'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['bigfile', 'bigfile2'], sorted(self.listdir()))
self.assertEqual([0, LARGE_FILE_SIZE, LARGE_FILE_SIZE], self.check_file_size())

def test_fileobj_tar_gz_should_not_simplify_archives(self):
source = os.path.join(self.temp_dir, 'source_dir')
os.mkdir(source)
Expand Down Expand Up @@ -108,7 +148,7 @@ def test_fileobj_tar_gz_with_dsstore_should_not_simplify_archive_2(self):
self.write_string_to_file('testing', os.path.join(source, '.DS_Store'))
self.do_upload(('source.tar.gz', tar_gzip_directory(source)))
self.assertEqual(['.DS_Store', 'README', 'README2'], sorted(self.listdir()))

def mock_url_source(self, fileobj, ext=""):
"""Returns a URL that is mocked to return the contents of fileobj.
The URL will end in the extension "ext", if given.
Expand Down Expand Up @@ -149,10 +189,23 @@ def test_url_git(self):
# change, then update this test.
self.check_file_equals_string('testfile.md', '# test\nUsed for testing\n')

def test_upload_memory(self):
self.write_file_of_size(LARGE_FILE_SIZE, os.path.join(self.temp_dir, 'bigfile'))
mem_usage = memory_usage(
(self.do_upload(('bigfile', os.path.join(self.temp_dir, 'bigfile'))), ),
interval=0.1,
timeout=1
)
self.assertEqual(max(mem_usage) < 100000000, True)

def write_string_to_file(self, string, file_path):
with open(file_path, 'w') as f:
f.write(string)

def write_file_of_size(self, size: int, file_path: str):
with open(file_path, "wb") as f:
f.seek(size - 1)
f.write(b"\0")

class UploadManagerDiskStorageTest(UploadManagerTestBase, unittest.TestCase):
"""Tests for UploadManager that upload files to disk storage."""
Expand Down
Loading