Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS_S3_ALLOW_UNSAFE_RENAME="true" in maybe_set_aws_credentials incorrectly passed to pyarrow #88

Open
rzig opened this issue Jan 9, 2025 · 1 comment
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@rzig
Copy link

rzig commented Jan 9, 2025

Hi! In maybe_set_aws_credentials, the AWS_S3_ALLOW_UNSAFE_RENAME is set to true. This causes a the following call, with no credentials provided, to fail:

import dask_deltatable as ddt
ddf = ddt.read_deltalake("s3://my/deltatable", storage_options={})
ddf.head(n=10)

This call currently fails with the following error (full traceback at end):

File [/opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:526](https://nb.cxproduction.cc/opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py#line=525), in S3FileSystem.set_session(self, refresh, kwargs)
    524 conf = AioConfig(**config_kwargs)
    525 if self.session is None:
--> 526     self.session = aiobotocore.session.AioSession(**self.kwargs)
    528 for parameters in (config_kwargs, self.kwargs, init_kwargs, client_kwargs):
    529     for option in ("region_name", "endpoint_url"):

TypeError: AioSession.__init__() got an unexpected keyword argument 'AWS_S3_ALLOW_UNSAFE_RENAME'

The read_deltalake call should succeed, since I'm executing it on an EC2 instance that has an IAM role configured with permissions to my bucket. I've confirmed I can read this same table with polars:

import polars as pl
df = pl.read_delta("s3://my/deltatable")
df.head()

I am able to work around this temporarily by using path s3a://my/deltatable since maybe_set_aws_credentials explicitly checks for the s3:// prefix (is this also a bug? it should probably check for s3a and s3n too?), but I'm pretty sure this is a bug related to how these credentials eventually make their way to pyarrow. I've included my package versions and full traceback below. If this is indeed a bug, I'm happy to try to write a fix.

Traceback

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[21], line 1
----> 1 ddf.head(n=10)

File /opt/tljh/user/lib/python3.12/site-packages/dask_expr/_collection.py:706, in FrameBase.head(self, n, npartitions, compute)
704 out = new_collection(expr.Head(self, n=n, npartitions=npartitions))
705 if compute:
--> 706 out = out.compute()
707 return out

File /opt/tljh/user/lib/python3.12/site-packages/dask_expr/_collection.py:480, in FrameBase.compute(self, fuse, concatenate, **kwargs)
478 out = out.repartition(npartitions=1)
479 out = out.optimize(fuse=fuse)
--> 480 return DaskMethodsMixin.compute(out, **kwargs)

File /opt/tljh/user/lib/python3.12/site-packages/dask/base.py:372, in DaskMethodsMixin.compute(self, **kwargs)
348 def compute(self, **kwargs):
349 """Compute this dask collection
350
351 This turns a lazy Dask collection into its in-memory equivalent.
(...)
370 dask.compute
371 """
--> 372 (result,) = compute(self, traverse=False, **kwargs)
373 return result

File /opt/tljh/user/lib/python3.12/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
657 postcomputes.append(x.dask_postcompute())
659 with shorten_traceback():
--> 660 results = schedule(dsk, keys, **kwargs)
662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /opt/tljh/user/lib/python3.12/site-packages/dask_deltatable/core.py:76, in _read_delta_partition(filename, schema, fs, columns, filter, pyarrow_to_pandas, **_kwargs)
70 pyarrow_to_pandas["types_mapper"] = _get_type_mapper(
71 pyarrow_to_pandas.get("types_mapper")
72 )
73 pyarrow_to_pandas["ignore_metadata"] = pyarrow_to_pandas.get(
74 "ignore_metadata", False
75 )
---> 76 table = pa_ds.dataset(
77 source=filename,
78 schema=schema,
79 filesystem=fs,
80 format="parquet",
81 partitioning="hive",
82 ).to_table(filter=filter_expression, columns=columns)
83 return table.to_pandas(**pyarrow_to_pandas)

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/dataset.py:794, in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes)
783 kwargs = dict(
784 schema=schema,
785 filesystem=filesystem,
(...)
790 selector_ignore_prefixes=ignore_prefixes
791 )
793 if _is_path_like(source):
--> 794 return _filesystem_dataset(source, **kwargs)
795 elif isinstance(source, (tuple, list)):
796 if all(_is_path_like(elem) or isinstance(elem, FileInfo) for elem in source):

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/dataset.py:476, in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes)
474 fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
475 else:
--> 476 fs, paths_or_selector = _ensure_single_source(source, filesystem)
478 options = FileSystemFactoryOptions(
479 partitioning=partitioning,
480 partition_base_dir=partition_base_dir,
481 exclude_invalid_files=exclude_invalid_files,
482 selector_ignore_prefixes=selector_ignore_prefixes
483 )
484 factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/dataset.py:432, in _ensure_single_source(path, filesystem)
429 path = filesystem.normalize_path(path)
431 # retrieve the file descriptor
--> 432 file_info = filesystem.get_file_info(path)
434 # depending on the path type either return with a recursive
435 # directory selector or as a list containing a single file
436 if file_info.type == FileType.Directory:

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/_fs.pyx:590, in pyarrow._fs.FileSystem.get_file_info()

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/error.pxi:155, in pyarrow.lib.pyarrow_internal_check_status()

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/error.pxi:89, in pyarrow.lib.check_status()

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/_fs.pyx:1498, in pyarrow._fs._cb_get_file_info()

File /opt/tljh/user/lib/python3.12/site-packages/pyarrow/fs.py:322, in FSSpecHandler.get_file_info(self, paths)
320 for path in paths:
321 try:
--> 322 info = self.fs.info(path)
323 except FileNotFoundError:
324 infos.append(FileInfo(path, FileType.NotFound))

File /opt/tljh/user/lib/python3.12/site-packages/fsspec/asyn.py:118, in sync_wrapper..wrapper(*args, **kwargs)
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
--> 118 return sync(self.loop, func, *args, **kwargs)

File /opt/tljh/user/lib/python3.12/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
--> 103 raise return_result
104 else:
105 return return_result

File /opt/tljh/user/lib/python3.12/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
---> 56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex

File /opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:1426, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
1424 if key:
1425 try:
-> 1426 out = await self._call_s3(
1427 "head_object",
1428 self.kwargs,
1429 Bucket=bucket,
1430 Key=key,
1431 **version_id_kw(version_id),
1432 **self.req_kw,
1433 )
1434 return {
1435 "ETag": out.get("ETag", ""),
1436 "LastModified": out.get("LastModified", ""),
(...)
1442 "ContentType": out.get("ContentType"),
1443 }
1444 except FileNotFoundError:

File /opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:364, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
363 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 364 await self.set_session()
365 s3 = await self.get_s3(kwargs.get("Bucket"))
366 method = getattr(s3, method)

File /opt/tljh/user/lib/python3.12/site-packages/s3fs/core.py:526, in S3FileSystem.set_session(self, refresh, kwargs)
524 conf = AioConfig(**config_kwargs)
525 if self.session is None:
--> 526 self.session = aiobotocore.session.AioSession(**self.kwargs)
528 for parameters in (config_kwargs, self.kwargs, init_kwargs, client_kwargs):
529 for option in ("region_name", "endpoint_url"):

TypeError: AioSession.init() got an unexpected keyword argument 'AWS_S3_ALLOW_UNSAFE_RENAME'

Package Versions

Deltalake: 0.23.2 Dask-deltatable: 0.3.3 Pyarrow: 18.1.0

@jacobtomlinson
Copy link
Collaborator

jacobtomlinson commented Jan 10, 2025

There is a comment that suggests the argument is intended for a delta specific API, so perhaps something has broken upstream.

# Capitalized is used in delta specific API and lowercase is for S3FileSystem

I've reproduced the issue locally and I've also downgraded pyarrow, deltalake and s3fs to the versions that were available at the time #78 was merged, but I can still produce the issue. So I don't think it's a regression in any of those packages.

It looks like polars may have gone through a similar regression in pola-rs/polars#19878, although it's not immediately clear if it's the same problem.

@jacobtomlinson jacobtomlinson added bug Something isn't working help wanted Extra attention is needed labels Jan 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants