Skip to content

Commit

Permalink
feat(streaming): async file / file-slice streaming function
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Aug 20, 2024
1 parent b4b4adf commit c67fbec
Show file tree
Hide file tree
Showing 5 changed files with 690 additions and 29 deletions.
61 changes: 61 additions & 0 deletions bento_lib/streaming/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import aiofiles
import aiofiles.os
import pathlib

from .range import validate_interval

__all__ = ["stream_file"]


# TODO: support multipart/byterange responses somehow (generator of generators?)


async def stream_file(
path: pathlib.Path,
interval: tuple[int, int] | None,
chunk_size: int,
yield_content_length_as_first_8: bool = False,
file_size: int | None = None,
refget_mode: bool = False,
):
"""
Stream the contents of a file, optionally yielding the content length as the first 8 bytes of the stream.
Coordinate parameters are 0-based and inclusive, e.g., 0-10 yields the first 11 bytes. This matches the format of
HTTP range headers.
:param path: The path to the file to stream from.
:param interval: Inclusive, 0-based byte interval to stream. If None, the whole file is streamed instead.
:param chunk_size: The maximum number of bytes to read/yield at a time while streaming the file.
:param yield_content_length_as_first_8: Whether to yield the response size as the first byte chunk (8 bytes,
big-endian encoded) of the stream.
:param file_size: The whole file's size, if already known. If this has already been calculated/stored, this saves a
stat() call.
:param refget_mode: TODO
"""

final_file_size: int = file_size or (await aiofiles.os.stat(path)).st_size
final_interval = interval if interval else (0, final_file_size - 1)

validate_interval(final_interval, final_file_size, refget_mode=refget_mode)

start, end = final_interval
response_size: int = end - start + 1 # Inclusive interval - need to add 1

if yield_content_length_as_first_8:
yield response_size.to_bytes(8, "big")

async with aiofiles.open(path, "rb") as ff:
# First, skip over <start> bytes to get to the beginning of the range
await ff.seek(start)

byte_offset: int = start
while True:
# Add a 1 to the amount to read if it's below chunk size, because the last coordinate is inclusive.
data = await ff.read(min(chunk_size, end + 1 - byte_offset))
byte_offset += len(data)
yield data

# If we've hit the end of the file and are reading empty byte strings, or we've reached the
# end of our range (inclusive), then escape the loop.
# This is guaranteed to terminate with a finite-sized file.
if not data or byte_offset > end:
break
45 changes: 25 additions & 20 deletions bento_lib/streaming/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .exceptions import StreamingRangeNotSatisfiable, StreamingBadRange

__all__ = ["parse_range_header"]
__all__ = ["validate_interval", "parse_range_header"]


BYTE_RANGE_INTERVAL_SPLIT = re.compile(r",\s*")
Expand All @@ -11,6 +11,28 @@
BYTE_RANGE_SUFFIX = re.compile(r"^-(\d+)$")


def validate_interval(interval: tuple[int, int], content_length: int, refget_mode: bool = False) -> None:
int_start, int_end = interval

# Order of these checks is important - we want to give a 416 if start/end is beyond content length (which also
# results in an inverted interval)

if int_start >= content_length:
# both ends of the range are 0-indexed, inclusive - so it starts at 0 and ends at content_length - 1
if refget_mode: # sigh... GA4GH moment
raise StreamingBadRange(f"start is beyond content length: {int_start} >= {content_length}")
raise StreamingRangeNotSatisfiable(f"not satisfiable: {int_start} >= {content_length}", content_length)

if int_end >= content_length:
# both ends of the range are 0-indexed, inclusive - so it starts at 0 and ends at content_length - 1
if refget_mode: # sigh... GA4GH moment
raise StreamingBadRange(f"end is beyond content length: {int_end} >= {content_length}")
raise StreamingRangeNotSatisfiable(f"not satisfiable: {int_end} >= {content_length}", content_length)

if not refget_mode and int_start > int_end:
raise StreamingRangeNotSatisfiable(f"inverted interval: {interval}", content_length)


def parse_range_header(
range_header: str | None, content_length: int, refget_mode: bool = False
) -> tuple[tuple[int, int], ...]:
Expand Down Expand Up @@ -50,26 +72,9 @@ def parse_range_header(

# validate intervals are not inverted and do not overlap each other:
for i, int1 in enumerate(intervals):
int1_start, int1_end = int1

# Order of these checks is important - we want to give a 416 if start/end is beyond content length (which also
# results in an inverted interval)

if int1_start >= content_length:
# both ends of the range are 0-indexed, inclusive - so it starts at 0 and ends at content_length - 1
if refget_mode: # sigh... GA4GH moment
raise StreamingBadRange(f"start is beyond content length: {int1_start} >= {content_length}")
raise StreamingRangeNotSatisfiable(f"not satisfiable: {int1_start} >= {content_length}", content_length)

if int1_end >= content_length:
# both ends of the range are 0-indexed, inclusive - so it starts at 0 and ends at content_length - 1
if refget_mode: # sigh... GA4GH moment
raise StreamingBadRange(f"end is beyond content length: {int1_end} >= {content_length}")
raise StreamingRangeNotSatisfiable(f"not satisfiable: {int1_end} >= {content_length}", content_length)

if not refget_mode and int1_start > int1_end:
raise StreamingRangeNotSatisfiable(f"inverted interval: {int1}", content_length)
validate_interval(int1, content_length, refget_mode=refget_mode)

_, int1_end = int1
if i < n_intervals - 1:
int2 = intervals[i + 1]
int2_start, int2_end = int2
Expand Down
5 changes: 5 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"authz_test_cases",
"TEST_AUTHZ_VALID_POST_BODY",
"TEST_AUTHZ_HEADERS",
"DATA_DIR",
"SARS_COV_2_FASTA_PATH",
"WDL_DIR",
"WORKFLOW_DEF",
]
Expand Down Expand Up @@ -38,6 +40,9 @@
TEST_AUTHZ_VALID_POST_BODY = {"test1": "a", "test2": "b"}
TEST_AUTHZ_HEADERS = {"Authorization": "Bearer test"}

DATA_DIR = Path(__file__).parent / "data"
SARS_COV_2_FASTA_PATH = DATA_DIR / "sars_cov_2.fa"

WDL_DIR = Path(__file__).parent / "wdls"

WORKFLOW_DEF = wd = workflows.models.WorkflowDefinition(
Expand Down
Loading

0 comments on commit c67fbec

Please sign in to comment.