From 8f2e9e9a572ffedd43bc3938d76b14c802034687 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 16 Oct 2024 10:43:50 -0400 Subject: [PATCH] Concurrency in pipe() --- s3fs/core.py | 48 +++++++++++++++++------------ s3fs/tests/derived/s3fs_fixtures.py | 5 ++- s3fs/tests/test_s3fs.py | 11 ++++--- 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 2da6f0bd..2eb0d777 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -269,7 +269,7 @@ class S3FileSystem(AsyncFileSystem): connect_timeout = 5 retries = 5 read_timeout = 15 - default_block_size = 5 * 2**20 + default_block_size = 50 * 2**20 protocol = ("s3", "s3a") _extra_tokenize_attributes = ("default_block_size",) @@ -295,7 +295,7 @@ def __init__( cache_regions=False, asynchronous=False, loop=None, - max_concurrency=1, + max_concurrency=10, **kwargs, ): if key and username: @@ -1127,8 +1127,11 @@ async def _call_and_read(): return await _error_wrapper(_call_and_read, retries=self.retries) - async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): + async def _pipe_file( + self, path, data, chunksize=50 * 2**20, max_concurrency=None, **kwargs + ): bucket, key, _ = self.split_path(path) + concurrency = max_concurrency or self.max_concurrency size = len(data) # 5 GB is the limit for an S3 PUT if size < min(5 * 2**30, 2 * chunksize): @@ -1140,23 +1143,27 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): mpu = await self._call_s3( "create_multipart_upload", Bucket=bucket, Key=key, **kwargs ) - - # TODO: cancel MPU if the following fails - out = [ - await self._call_s3( - "upload_part", - Bucket=bucket, - PartNumber=i + 1, - UploadId=mpu["UploadId"], - Body=data[off : off + chunksize], - Key=key, + ranges = list(range(0, len(data), chunksize)) + inds = list(range(0, len(ranges), concurrency)) + [len(ranges)] + parts = [] + for start, stop in zip(inds[:-1], inds[1:]): + out = await asyncio.gather( + *[ + self._call_s3( + "upload_part", + Bucket=bucket, + PartNumber=i + 1, + UploadId=mpu["UploadId"], + Body=data[ranges[i] : ranges[i] + chunksize], + Key=key, + ) + for i in range(start, stop) + ] + ) + parts.extend( + {"PartNumber": i + 1, "ETag": o["ETag"]} + for i, o in zip(range(start, stop), out) ) - for i, off in enumerate(range(0, len(data), chunksize)) - ] - - parts = [ - {"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out) - ] await self._call_s3( "complete_multipart_upload", Bucket=bucket, @@ -2139,7 +2146,7 @@ def __init__( s3, path, mode="rb", - block_size=5 * 2**20, + block_size=50 * 2**20, acl=False, version_id=None, fill_cache=True, @@ -2337,6 +2344,7 @@ def _upload_chunk(self, final=False): (data0, data1) = (None, self.buffer.read(self.blocksize)) while data1: + # concurrency here?? (data0, data1) = (data1, self.buffer.read(self.blocksize)) data1_size = len(data1) diff --git a/s3fs/tests/derived/s3fs_fixtures.py b/s3fs/tests/derived/s3fs_fixtures.py index eae000cf..a1ca7fb9 100644 --- a/s3fs/tests/derived/s3fs_fixtures.py +++ b/s3fs/tests/derived/s3fs_fixtures.py @@ -11,7 +11,7 @@ test_bucket_name = "test" secure_bucket_name = "test-secure" versioned_bucket_name = "test-versioned" -port = 5555 +port = 5556 endpoint_uri = "http://127.0.0.1:%s/" % port @@ -109,6 +109,9 @@ def _s3_base(self): pass timeout -= 0.1 time.sleep(0.1) + if proc.poll() is not None: + proc.terminate() + raise RuntimeError("Starting moto server failed") print("server up") yield print("moto done") diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index d3d90899..fba55db6 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -90,7 +90,10 @@ def s3_base(): def reset_s3_fixture(): # We reuse the MotoServer for all tests # But we do want a clean state for every test - requests.post(f"{endpoint_uri}/moto-api/reset") + try: + requests.post(f"{endpoint_uri}/moto-api/reset") + except: + pass def get_boto3_client(): @@ -1250,7 +1253,7 @@ def test_write_fails(s3): def test_write_blocks(s3): - with s3.open(test_bucket_name + "/temp", "wb") as f: + with s3.open(test_bucket_name + "/temp", "wb", block_size=5 * 2**20) as f: f.write(b"a" * 2 * 2**20) assert f.buffer.tell() == 2 * 2**20 assert not (f.parts) @@ -1784,7 +1787,7 @@ def test_change_defaults_only_subsequent(): S3FileSystem.cachable = False # don't reuse instances with same pars fs_default = S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}) - assert fs_default.default_block_size == 5 * (1024**2) + assert fs_default.default_block_size == 50 * (1024**2) fs_overridden = S3FileSystem( default_block_size=64 * (1024**2), @@ -1801,7 +1804,7 @@ def test_change_defaults_only_subsequent(): # Test the other file systems created to see if their block sizes changed assert fs_overridden.default_block_size == 64 * (1024**2) - assert fs_default.default_block_size == 5 * (1024**2) + assert fs_default.default_block_size == 50 * (1024**2) finally: S3FileSystem.default_block_size = 5 * (1024**2) S3FileSystem.cachable = True