diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3408eece..e8eaa94f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,6 @@ jobs: fail-fast: false matrix: python-version: - - "3.8" - "3.9" - "3.10" - "3.11" @@ -39,9 +38,10 @@ jobs: shell: bash -l {0} run: | pip install git+https://github.com/fsspec/filesystem_spec - pip install --upgrade "aiobotocore${{ matrix.aiobotocore-version }}" boto3 # boto3 to ensure compatibility + pip install --upgrade "aiobotocore${{ matrix.aiobotocore-version }}" + pip install --upgrade "botocore" --no-deps pip install . --no-deps - pip show aiobotocore boto3 botocore + pip list - name: Run Tests shell: bash -l {0} diff --git a/ci/env.yaml b/ci/env.yaml index c67c18b6..915f703a 100644 --- a/ci/env.yaml +++ b/ci/env.yaml @@ -14,6 +14,6 @@ dependencies: - black - httpretty - aiobotocore - - "moto>=4,<5" + - moto - flask - fsspec diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index e09e51ae..65ffe261 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,6 +7,7 @@ Changelog - invalidate cache in one-shot pipe file (#904) - make pipe() concurrent (#901) - add py3.13 (#898) +- suppoert R2 multi-part uploads (#888) 2024.9.0 -------- diff --git a/pytest.ini b/pytest.ini index 653d5ad9..10cd3a8a 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,2 @@ [pytest] testpaths = s3fs -env = - BOTO_PATH=/dev/null - AWS_ACCESS_KEY_ID=dummy_key - AWS_SECRET_ACCESS_KEY=dummy_secret diff --git a/s3fs/core.py b/s3fs/core.py index f3dc411d..dcfb7c91 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -641,6 +641,10 @@ def _open( mode: string One of 'r', 'w', 'a', 'rb', 'wb', or 'ab'. These have the same meaning as they do for the built-in `open` function. + "x" mode, exclusive write, is only known to work on AWS S3, and + requires botocore>1.35.20. If the file is multi-part (i.e., has more + than one block), the condition is only checked on commit; if this fails, + the MPU is aborted. block_size: int Size of data-node blocks if reading fill_cache: bool @@ -1134,15 +1138,30 @@ async def _call_and_read(): return await _error_wrapper(_call_and_read, retries=self.retries) async def _pipe_file( - self, path, data, chunksize=50 * 2**20, max_concurrency=None, **kwargs + self, + path, + data, + chunksize=50 * 2**20, + max_concurrency=None, + mode="overwrite", + **kwargs, ): + """ + mode=="create", exclusive write, is only known to work on AWS S3, and + requires botocore>1.35.20 + """ bucket, key, _ = self.split_path(path) concurrency = max_concurrency or self.max_concurrency size = len(data) + if mode == "create": + match = {"IfNoneMatch": "*"} + else: + match = {} + # 5 GB is the limit for an S3 PUT if size < min(5 * 2**30, 2 * chunksize): out = await self._call_s3( - "put_object", Bucket=bucket, Key=key, Body=data, **kwargs + "put_object", Bucket=bucket, Key=key, Body=data, **kwargs, **match ) self.invalidate_cache(path) return out @@ -1154,32 +1173,37 @@ async def _pipe_file( ranges = list(range(0, len(data), chunksize)) inds = list(range(0, len(ranges), concurrency)) + [len(ranges)] parts = [] - for start, stop in zip(inds[:-1], inds[1:]): - out = await asyncio.gather( - *[ - self._call_s3( - "upload_part", - Bucket=bucket, - PartNumber=i + 1, - UploadId=mpu["UploadId"], - Body=data[ranges[i] : ranges[i] + chunksize], - Key=key, - ) - for i in range(start, stop) - ] - ) - parts.extend( - {"PartNumber": i + 1, "ETag": o["ETag"]} - for i, o in zip(range(start, stop), out) + try: + for start, stop in zip(inds[:-1], inds[1:]): + out = await asyncio.gather( + *[ + self._call_s3( + "upload_part", + Bucket=bucket, + PartNumber=i + 1, + UploadId=mpu["UploadId"], + Body=data[ranges[i] : ranges[i] + chunksize], + Key=key, + ) + for i in range(start, stop) + ] + ) + parts.extend( + {"PartNumber": i + 1, "ETag": o["ETag"]} + for i, o in zip(range(start, stop), out) + ) + await self._call_s3( + "complete_multipart_upload", + Bucket=bucket, + Key=key, + UploadId=mpu["UploadId"], + MultipartUpload={"Parts": parts}, + **match, ) - await self._call_s3( - "complete_multipart_upload", - Bucket=bucket, - Key=key, - UploadId=mpu["UploadId"], - MultipartUpload={"Parts": parts}, - ) - self.invalidate_cache(path) + self.invalidate_cache(path) + except Exception: + await self._abort_mpu(bucket, key, mpu["UploadId"]) + raise async def _put_file( self, @@ -1188,8 +1212,13 @@ async def _put_file( callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, max_concurrency=None, + mode="overwrite", **kwargs, ): + """ + mode=="create", exclusive write, is only known to work on AWS S3, and + requires botocore>1.35.20 + """ bucket, key, _ = self.split_path(rpath) if os.path.isdir(lpath): if key: @@ -1199,6 +1228,10 @@ async def _put_file( await self._mkdir(lpath) size = os.path.getsize(lpath) callback.set_size(size) + if mode == "create": + match = {"IfNoneMatch": "*"} + else: + match = {} if "ContentType" not in kwargs: content_type, _ = mimetypes.guess_type(lpath) @@ -1209,7 +1242,7 @@ async def _put_file( if size < min(5 * 2**30, 2 * chunksize): chunk = f0.read() await self._call_s3( - "put_object", Bucket=bucket, Key=key, Body=chunk, **kwargs + "put_object", Bucket=bucket, Key=key, Body=chunk, **kwargs, **match ) callback.relative_update(size) else: @@ -1217,25 +1250,31 @@ async def _put_file( mpu = await self._call_s3( "create_multipart_upload", Bucket=bucket, Key=key, **kwargs ) - out = await self._upload_file_part_concurrent( - bucket, - key, - mpu, - f0, - callback=callback, - chunksize=chunksize, - max_concurrency=max_concurrency, - ) - parts = [ - {"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out) - ] - await self._call_s3( - "complete_multipart_upload", - Bucket=bucket, - Key=key, - UploadId=mpu["UploadId"], - MultipartUpload={"Parts": parts}, - ) + try: + out = await self._upload_file_part_concurrent( + bucket, + key, + mpu, + f0, + callback=callback, + chunksize=chunksize, + max_concurrency=max_concurrency, + ) + parts = [ + {"PartNumber": i + 1, "ETag": o["ETag"]} + for i, o in enumerate(out) + ] + await self._call_s3( + "complete_multipart_upload", + Bucket=bucket, + Key=key, + UploadId=mpu["UploadId"], + MultipartUpload={"Parts": parts}, + **match, + ) + except Exception: + await self._abort_mpu(bucket, key, mpu["UploadId"]) + raise while rpath: self.invalidate_cache(rpath) rpath = self._parent(rpath) @@ -1941,18 +1980,22 @@ async def _list_multipart_uploads(self, bucket): list_multipart_uploads = sync_wrapper(_list_multipart_uploads) + async def _abort_mpu(self, bucket, key, mpu): + await self._call_s3( + "abort_multipart_upload", + Bucket=bucket, + Key=key, + UploadId=mpu, + ) + + abort_mpu = sync_wrapper(_abort_mpu) + async def _clear_multipart_uploads(self, bucket): """Remove any partial uploads in the bucket""" - out = await self._list_multipart_uploads(bucket) await asyncio.gather( *[ - self._call_s3( - "abort_multipart_upload", - Bucket=bucket, - Key=upload["Key"], - UploadId=upload["UploadId"], - ) - for upload in out + self._abort_mpu(bucket, upload["Key"], upload["UploadId"]) + for upload in await self._list_multipart_uploads(bucket) ] ) @@ -2414,6 +2457,10 @@ def commit(self): raise RuntimeError else: logger.debug("Complete multi-part upload for %s " % self) + if "x" in self.mode: + match = {"IfNoneMatch": "*"} + else: + match = {} part_info = {"Parts": self.parts} write_result = self._call_s3( "complete_multipart_upload", @@ -2421,6 +2468,7 @@ def commit(self): Key=self.key, UploadId=self.mpu["UploadId"], MultipartUpload=part_info, + **match, ) if self.fs.version_aware: @@ -2443,12 +2491,7 @@ def discard(self): def _abort_mpu(self): if self.mpu: - self._call_s3( - "abort_multipart_upload", - Bucket=self.bucket, - Key=self.key, - UploadId=self.mpu["UploadId"], - ) + self.fs.abort_mpu(self.bucket, self.key, self.mpu["UploadId"]) self.mpu = None diff --git a/s3fs/errors.py b/s3fs/errors.py index f5350670..5abe9fa3 100644 --- a/s3fs/errors.py +++ b/s3fs/errors.py @@ -137,11 +137,18 @@ def translate_boto_error(error, message=None, set_cause=True, *args, **kwargs): recognized, an IOError with the original error message is returned. """ error_response = getattr(error, "response", None) + if error_response is None: # non-http error, or response is None: return error code = error_response["Error"].get("Code") - constructor = ERROR_CODE_TO_EXCEPTION.get(code) + if ( + code == "PreconditionFailed" + and error_response["Error"].get("Condition", "") == "If-None-Match" + ): + constructor = FileExistsError + else: + constructor = ERROR_CODE_TO_EXCEPTION.get(code) if constructor: if not message: message = error_response["Error"].get("Message", str(error)) diff --git a/s3fs/tests/derived/s3fs_fixtures.py b/s3fs/tests/derived/s3fs_fixtures.py index a1ca7fb9..ca267131 100644 --- a/s3fs/tests/derived/s3fs_fixtures.py +++ b/s3fs/tests/derived/s3fs_fixtures.py @@ -75,45 +75,17 @@ def _get_boto3_client(self): @pytest.fixture(scope="class") def _s3_base(self): - # writable local S3 system - import shlex - import subprocess + # copy of s3_base in test_s3fs + from moto.moto_server.threaded_moto_server import ThreadedMotoServer - try: - # should fail since we didn't start server yet - r = requests.get(endpoint_uri) - except: - pass - else: - if r.ok: - raise RuntimeError("moto server already up") + server = ThreadedMotoServer(ip_address="127.0.0.1", port=port) + server.start() if "AWS_SECRET_ACCESS_KEY" not in os.environ: os.environ["AWS_SECRET_ACCESS_KEY"] = "foo" if "AWS_ACCESS_KEY_ID" not in os.environ: os.environ["AWS_ACCESS_KEY_ID"] = "foo" - proc = subprocess.Popen( - shlex.split("moto_server s3 -p %s" % port), - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - ) - timeout = 5 - while timeout > 0: - try: - print("polling for moto server") - r = requests.get(endpoint_uri) - if r.ok: - break - except: - pass - timeout -= 0.1 - time.sleep(0.1) - if proc.poll() is not None: - proc.terminate() - raise RuntimeError("Starting moto server failed") print("server up") yield print("moto done") - proc.terminate() - proc.wait() + server.stop() diff --git a/s3fs/tests/derived/s3fs_test.py b/s3fs/tests/derived/s3fs_test.py index 2430e0a8..4bf8bcdb 100644 --- a/s3fs/tests/derived/s3fs_test.py +++ b/s3fs/tests/derived/s3fs_test.py @@ -1,3 +1,5 @@ +import pytest + import fsspec.tests.abstract as abstract from s3fs.tests.derived.s3fs_fixtures import S3fsFixtures @@ -12,3 +14,14 @@ class TestS3fsGet(abstract.AbstractGetTests, S3fsFixtures): class TestS3fsPut(abstract.AbstractPutTests, S3fsFixtures): pass + + +class TestS3fsPipe(abstract.AbstractPipeTests, S3fsFixtures): + pass + + +class TestS3fsOpen(abstract.AbstractOpenTests, S3fsFixtures): + + test_open_exclusive = pytest.mark.xfail( + reason="complete_multipart_upload doesn't implement condition in moto" + )(abstract.AbstractOpenTests.test_open_exclusive) diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 3379da52..922ecd6b 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -1216,15 +1216,15 @@ def test_write_limit(s3): def test_write_small_secure(s3): - # Unfortunately moto does not yet support enforcing SSE policies. It also - # does not return the correct objects that can be used to test the results - # effectively. - # This test is left as a placeholder in case moto eventually supports this. - sse_params = SSEParams(server_side_encryption="aws:kms") - with s3.open(secure_bucket_name + "/test", "wb", writer_kwargs=sse_params) as f: + s3 = S3FileSystem( + s3_additional_kwargs={"ServerSideEncryption": "aws:kms"}, + client_kwargs={"endpoint_url": endpoint_uri}, + ) + s3.mkdir("mybucket") + with s3.open("mybucket/test", "wb") as f: f.write(b"hello") - assert s3.cat(secure_bucket_name + "/test") == b"hello" - sync(s3.loop, s3.s3.head_object, Bucket=secure_bucket_name, Key="test") + assert s3.cat("mybucket/test") == b"hello" + sync(s3.loop, s3.s3.head_object, Bucket="mybucket", Key="test") def test_write_large_secure(s3): @@ -2812,3 +2812,30 @@ async def test_invalidate_cache(s3: s3fs.S3FileSystem) -> None: await s3._pipe_file(f"{test_bucket_name}/a/c.txt", data=b"abc") after = await s3._ls(f"{test_bucket_name}/a/") assert sorted(after) == ["test/a/b.txt", "test/a/c.txt"] + + +@pytest.mark.xfail(reason="moto doesn't support conditional MPU") +def test_pipe_exclusive_big(s3): + chunksize = 5 * 2**20 # minimum allowed + data = b"x" * chunksize * 3 + s3.pipe(f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) + s3.pipe(f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) + with pytest.raises(FileExistsError): + s3.pipe(f"{test_bucket_name}/afile", data, mode="create", chunksize=chunksize) + assert not s3.list_multipart_uploads(test_bucket_name) + + +@pytest.mark.xfail(reason="moto doesn't support conditional MPU") +def test_put_exclusive_big(s3, tempdir): + chunksize = 5 * 2**20 # minimum allowed + data = b"x" * chunksize * 3 + fn = f"{tempdir}/afile" + with open(fn, "wb") as f: + f.write(fn) + s3.put(fn, f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) + s3.put(fn, f"{test_bucket_name}/afile", data, mode="overwrite", chunksize=chunksize) + with pytest.raises(FileExistsError): + s3.put( + fn, f"{test_bucket_name}/afile", data, mode="create", chunksize=chunksize + ) + assert not s3.list_multipart_uploads(test_bucket_name) diff --git a/setup.py b/setup.py index 5b7dd5b1..95f4d881 100755 --- a/setup.py +++ b/setup.py @@ -20,7 +20,6 @@ "Intended Audience :: Developers", "License :: OSI Approved :: BSD License", "Operating System :: OS Independent", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -34,7 +33,7 @@ license="BSD", keywords="s3, boto", packages=["s3fs"], - python_requires=">= 3.8", + python_requires=">= 3.9", install_requires=[open("requirements.txt").read().strip().split("\n")], extras_require={ "awscli": [f"aiobotocore[awscli]{aiobotocore_version_suffix}"],