diff --git a/pyproject.toml b/pyproject.toml index 39fe6729f..8d8ad7952 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,10 +24,12 @@ dependencies = [ "tqdm", "numpy>=1,<3", "pandas>=2.0.0", + "packaging", "pyarrow", "typing-extensions", "python-dateutil>=2", "attrs>=21.3.0", + "fsspec>=2024.2.0", "s3fs>=2024.2.0", "gcsfs>=2024.2.0", "adlfs>=2024.2.0", diff --git a/src/datachain/fs/__init__.py b/src/datachain/fs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/datachain/fs/reference.py b/src/datachain/fs/reference.py new file mode 100644 index 000000000..7577855c7 --- /dev/null +++ b/src/datachain/fs/reference.py @@ -0,0 +1,21 @@ +import fsspec +from packaging.version import Version, parse + +# fsspec==2025.2.0 added support for a proper `open()` in `ReferenceFileSystem`. +# Remove this module when `fsspec` minimum version requirement can be bumped. +if parse(fsspec.__version__) < Version("2025.2.0"): + from fsspec.core import split_protocol + from fsspec.implementations import reference + + class ReferenceFileSystem(reference.ReferenceFileSystem): + def _open(self, path, mode="rb", *args, **kwargs): + # overriding because `fsspec`'s `ReferenceFileSystem._open` + # reads the whole file in-memory. + (uri,) = self.references[path] + protocol, _ = split_protocol(uri) + return self.fss[protocol].open(uri, mode, *args, **kwargs) +else: + from fsspec.implementations.reference import ReferenceFileSystem # type: ignore[no-redef] # noqa: I001 + + +__all__ = ["ReferenceFileSystem"] diff --git a/src/datachain/lib/arrow.py b/src/datachain/lib/arrow.py index 2c3aebb2f..25443b566 100644 --- a/src/datachain/lib/arrow.py +++ b/src/datachain/lib/arrow.py @@ -2,13 +2,12 @@ from itertools import islice from typing import TYPE_CHECKING, Any, Optional -import fsspec.implementations.reference import orjson import pyarrow as pa -from fsspec.core import split_protocol from pyarrow.dataset import CsvFileFormat, dataset from tqdm.auto import tqdm +from datachain.fs.reference import ReferenceFileSystem from datachain.lib.data_model import dict_to_data_model from datachain.lib.file import ArrowRow, File from datachain.lib.model_store import ModelStore @@ -27,15 +26,6 @@ DATACHAIN_SIGNAL_SCHEMA_PARQUET_KEY = b"DataChain SignalSchema" -class ReferenceFileSystem(fsspec.implementations.reference.ReferenceFileSystem): - def _open(self, path, mode="rb", *args, **kwargs): - # overriding because `fsspec`'s `ReferenceFileSystem._open` - # reads the whole file in-memory. - (uri,) = self.references[path] - protocol, _ = split_protocol(uri) - return self.fss[protocol].open(uri, mode, *args, **kwargs) - - class ArrowGenerator(Generator): DEFAULT_BATCH_SIZE = 2**17 # same as `pyarrow._dataset._DEFAULT_BATCH_SIZE`