Skip to content

Commit

Permalink
file: support exporting files as a symlink (#819)
Browse files Browse the repository at this point in the history
  • Loading branch information
skshetry authored Jan 16, 2025
1 parent 88737b6 commit aad99e2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
BinaryIO,
Callable,
ClassVar,
Literal,
Optional,
TypeVar,
Union,
Expand Down Expand Up @@ -2415,19 +2416,30 @@ def setup(self, **kwargs) -> "Self":
def export_files(
self,
output: str,
signal="file",
signal: str = "file",
placement: FileExportPlacement = "fullpath",
use_cache: bool = True,
link_type: Literal["copy", "symlink"] = "copy",
) -> None:
"""Method that exports all files from chain to some folder."""
"""Export files from a specified signal to a directory.
Args:
output: Path to the target directory for exporting files.
signal: Name of the signal to export files from.
placement: The method to use for naming exported files.
The possible values are: "filename", "etag", "fullpath", and "checksum".
use_cache: If `True`, cache the files before exporting.
link_type: Method to use for exporting files.
Falls back to `'copy'` if symlinking fails.
"""
if placement == "filename" and (
self._query.distinct(pathfunc.name(C(f"{signal}__path"))).count()
!= self._query.count()
):
raise ValueError("Files with the same name found")

for file in self.collect(signal):
file.export(output, placement, use_cache) # type: ignore[union-attr]
file.export(output, placement, use_cache, link_type=link_type) # type: ignore[union-attr]

def shuffle(self) -> "Self":
"""Shuffle the rows of the chain deterministically."""
Expand Down
23 changes: 23 additions & 0 deletions src/datachain/lib/file.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import hashlib
import io
import json
Expand Down Expand Up @@ -236,11 +237,26 @@ def save(self, destination: str):
with open(destination, mode="wb") as f:
f.write(self.read())

def _symlink_to(self, destination: str):
if self.location:
raise OSError(errno.ENOTSUP, "Symlinking virtual file is not supported")

if self._caching_enabled:
self.ensure_cached()
source = self.get_local_path()
assert source, "File was not cached"
elif self.source.startswith("file://"):
source = self.get_path()
else:
raise OSError(errno.EXDEV, "can't link across filesystems")
return os.symlink(source, destination)

def export(
self,
output: str,
placement: ExportPlacement = "fullpath",
use_cache: bool = True,
link_type: Literal["copy", "symlink"] = "copy",
) -> None:
"""Export file to new location."""
if use_cache:
Expand All @@ -249,6 +265,13 @@ def export(
dst_dir = os.path.dirname(dst)
os.makedirs(dst_dir, exist_ok=True)

if link_type == "symlink":
try:
return self._symlink_to(dst)
except OSError as exc:
if exc.errno not in (errno.ENOTSUP, errno.EXDEV, errno.ENOSYS):
raise

self.save(dst)

def _set_stream(
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/lib/test_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from pathlib import Path
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -379,3 +380,18 @@ def test_get_local_path(tmp_path, catalog):
assert file.get_local_path() is None
file.ensure_cached()
assert file.get_local_path() is not None


@pytest.mark.parametrize("use_cache", (True, False))
def test_export_with_symlink(tmp_path, catalog, use_cache):
path = tmp_path / "myfile.txt"
path.write_text("some text")

file = File(path=path.name, source=tmp_path.as_uri())
file._set_stream(catalog, use_cache)

file.export(tmp_path / "dir", link_type="symlink", use_cache=use_cache)
assert (tmp_path / "dir" / "myfile.txt").is_symlink()

dst = Path(file.get_local_path()) if use_cache else path
assert (tmp_path / "dir" / "myfile.txt").resolve() == dst

0 comments on commit aad99e2

Please sign in to comment.