Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multireaderfilestream Redesign #4595

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 65 additions & 30 deletions codalab/lib/beam/MultiReaderFileStream.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
from io import BytesIO
from threading import Lock

from codalab.worker.un_gzip_stream import BytesBuffer

import time

from io import BytesIO, SEEK_SET, SEEK_END
from threading import Lock
class MultiReaderFileStream(BytesIO):
"""
FileStream that support multiple readers
FileStream that takes an input stream fileobj, and supports N readers with the following features and constraints:
- Each reader's postion is tracked
- A buffer of bytes() is stored which stores bytes from the position of the slowest reader
minus a LOOKBACK_LENGTH (default 32 MiB) to the fastest reader
- The fastest reader can be at most MAX_THRESHOLD (default 64 MiB) ahead of the slowest reader, reads made
further than 64MiB will sleep until the slowest reader catches up
"""
NUM_READERS = 2

dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, fileobj):
self._bufs = [BytesBuffer() for _ in range(0, self.NUM_READERS)]
self._pos = [0 for _ in range(0, self.NUM_READERS)]
self._fileobj = fileobj
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers.

def __init__(self, fileobj, lookback_length=32*1024*1024):
self._buffer = bytes() # Buffer of bytes read from the file object within the limits defined
self._buffer_start_pos = 0 # start position of buffer in the fileobj (min reader position - LOOKBACK LENGTH)
self._pos = [0 for _ in range(self.NUM_READERS)] # position of each reader in the fileobj
self._fileobj = fileobj # The original file object the readers are reading from
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffer.
class FileStreamReader(BytesIO):
def __init__(s, index):
s._index = index
Expand All @@ -25,33 +28,65 @@ def read(s, num_bytes=None):

def peek(s, num_bytes):
return self.peek(s._index, num_bytes)

def seek(s, offset, whence=SEEK_SET):
return self.seek(s._index, offset, whence)

self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)]
self.LOOKBACK_LENGTH = lookback_length
self.MAX_THRESHOLD = self.LOOKBACK_LENGTH * 2

def _fill_buf_bytes(self, index: int, num_bytes=None):
with self._lock:
while num_bytes is None or len(self._bufs[index]) < num_bytes:
s = self._fileobj.read(num_bytes)
if not s:
break
for i in range(0, self.NUM_READERS):
self._bufs[i].write(s)

def read(self, index: int, num_bytes=None): # type: ignore
def _fill_buf_bytes(self, num_bytes=0):
"""
Fills the buffer with bytes from the fileobj
"""
s = self._fileobj.read(num_bytes)
if not s:
return
self._buffer += s


def read(self, index: int, num_bytes: int): # type: ignore
"""Read the specified number of bytes from the associated file.
index: index that specifies which reader is reading.
"""
self._fill_buf_bytes(index, num_bytes)
if num_bytes is None:
num_bytes = len(self._bufs[index])
s = self._bufs[index].read(num_bytes)
self._pos[index] += len(s)
s = self.peek(index, num_bytes)
with self._lock:
# Modify reader position in fileobj
self._pos[index] += len(s)

dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
# If this reader is the minimum reader, we can remove some bytes from the beginning of the buffer
# Calculated min position of buffer minus current min position of buffer
diff = (min(self._pos) - self.LOOKBACK_LENGTH) - self._buffer_start_pos
# NOTE: it's possible for diff < 0 if seek backwards occur
if diff > 0:
self._buffer = self._buffer[diff:]
self._buffer_start_pos += diff
return s

def peek(self, index: int, num_bytes): # type: ignore
self._fill_buf_bytes(index, num_bytes)
s = self._bufs[index].peek(num_bytes)
def peek(self, index: int, num_bytes: int): # type: ignore
new_pos = self._pos[index] + num_bytes
while new_pos - self._buffer_start_pos > self.MAX_THRESHOLD:
time.sleep(.1) # 100 ms

with self._lock:
# Calculate how many new bytes need to be read
new_bytes_needed = new_pos - max(self._pos)
if new_bytes_needed > 0:
self._fill_buf_bytes(new_bytes_needed)

# Get the bytes in the buffer that correspond to the read function call
buffer_index = self._pos[index] - self._buffer_start_pos
s = self._buffer[buffer_index:buffer_index + num_bytes]

return s

def seek(self, index: int, offset: int, whence=SEEK_SET):
if whence == SEEK_END:
super().seek(offset, whence)
else:
assert offset >= self._buffer_start_pos
self._pos[index] = offset

def close(self):
self.__input.close()
5 changes: 3 additions & 2 deletions codalab/lib/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,12 @@ def write_fileobj(
conn_str = os.environ.get('AZURE_STORAGE_CONNECTION_STRING', '')
os.environ['AZURE_STORAGE_CONNECTION_STRING'] = bundle_conn_str
try:
CHUNK_SIZE = 16 * 1024
# Chunk size set to 1MiB for performance
CHUNK_SIZE = 1024 * 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why increase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upload speed with the smaller chunk size was too slow due to the sleep behavior that occurs on the faster reader, which is always the index reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there's no super meaningful reason to keep chunk size smallish since the speed tradeoff is too large

dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved

def upload_file_content():
iteration = 0
ITERATIONS_PER_DISK_CHECK = 2000
ITERATIONS_PER_DISK_CHECK = 32
bytes_uploaded = 0

with FileSystems.create(
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ websockets==9.1
kubernetes==12.0.1
google-cloud-storage==2.0.0
httpio==0.3.0
memory_profiler==0.61.0
157 changes: 157 additions & 0 deletions tests/unit/beam/multireaderfilestream_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import tempfile
import time
import unittest

from threading import Thread

from codalab.lib.beam.MultiReaderFileStream import MultiReaderFileStream

FILESIZE = 100000000
CHUNKSIZE = FILESIZE/10

class MultiReaderFileStreamTest(unittest.TestCase):
def test_reader_distance(self):
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
"""
This test verifies that both readers in the MultiReaderFileStream
are within the limits defined in the class:
- Slowest reader is at most MAX_THRESHOLD behind the fastest reader
"""
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file
for _ in range(4):
status = reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)

t1.start()

# Sleep a little for thread 1 to start reading
time.sleep(.5)

# Assert that the first reader has not read past the maximum threshold
self.assertGreater(m_stream.MAX_THRESHOLD + 1, m_stream._pos[0])

t2.start()

# Sleep a little for thread 2 to start reading
time.sleep(.5)

# Assert that the first reader is at 100000000, second reader is at 40000000
self.assertEqual(FILESIZE, m_stream._pos[0])
self.assertEqual(40000000, m_stream._pos[1])

# Assert that the buffer is at 6445568 (40000000 - LOOKBACK_LENGTH)
calculated_buffer_start_pos = 40000000 - m_stream.LOOKBACK_LENGTH
self.assertEqual(calculated_buffer_start_pos, m_stream._buffer_start_pos)

# Assert that the buffer is length 100000000 - 6445568
self.assertEqual(FILESIZE - calculated_buffer_start_pos, len(m_stream._buffer))

t1.join()
t2.join()

def test_backwards_seek(self):
"""
This test verifies that a backwards seek within the lookback length
defined in the MultiReaderFileStream class behaves as expected
"""
with tempfile.NamedTemporaryFile(delete=True) as f:
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to 10000000 and read another 4/10 of the file
for _ in range(4):
reader_2.read(CHUNKSIZE)

try:
reader_2.seek(10000000)
except AssertionError as e:
result = e

for _ in range(4):
reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsNone(result)

# Check that reader 2 is at 50000000 and buffer position is correct
self.assertEqual(50000000, m_stream._pos[1])
self.assertEqual(50000000 - m_stream.LOOKBACK_LENGTH, m_stream._buffer_start_pos)


def test_too_far_seek(self):
"""
This test verifies that a backwards seek past the lookback length
defined in the MultiReaderFileStream class behaves as expected with
an AssertionError
"""
with tempfile.NamedTemporaryFile(delete=True) as f:
dma1dma1 marked this conversation as resolved.
Show resolved Hide resolved
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream.readers[0]
reader_2 = m_stream.readers[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to the beginning
for _ in range(4):
reader_2.read(CHUNKSIZE)

try:
reader_2.seek(0)
except AssertionError as e:
result = e

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsInstance(result, AssertionError)
Loading
Loading