Skip to content

Commit

Permalink
Remove storage and partials tables and related codebase (#544)
Browse files Browse the repository at this point in the history
* removing obsolete code

* removed storage class and codebase

* fixing tests

* removed find stale bucket
  • Loading branch information
ilongin authored Oct 31, 2024
1 parent 529c297 commit a516c94
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 959 deletions.
28 changes: 3 additions & 25 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
DatasetStats,
DatasetStatus,
RowDict,
StorageURI,
create_dataset_uri,
parse_dataset_uri,
)
Expand All @@ -58,7 +59,6 @@
from datachain.nodes_thread_pool import NodesThreadPool
from datachain.remote.studio import StudioClient
from datachain.sql.types import DateTime, SQLType, String
from datachain.storage import StorageURI
from datachain.utils import (
DataChainDir,
batched,
Expand Down Expand Up @@ -1702,31 +1702,9 @@ def index(
*,
client_config=None,
) -> None:
root_sources = [
src for src in sources if Client.get_implementation(src).is_root_url(src)
]
non_root_sources = [
src
for src in sources
if not Client.get_implementation(src).is_root_url(src)
]

client_config = client_config or self.client_config

# for root sources (e.g s3://) we are just getting all buckets and
# saving them as storages, without further indexing in each bucket
for source in root_sources:
for bucket in Client.get_implementation(source).ls_buckets(**client_config):
client = self.get_client(bucket.uri, **client_config)
print(f"Registering storage {client.uri}")
self.metastore.create_storage_if_not_registered(client.uri)

self.enlist_sources(
non_root_sources,
sources,
update,
client_config=client_config,
client_config=client_config or self.client_config,
only_index=True,
)

def find_stale_storages(self) -> None:
self.metastore.find_stale_storages()
8 changes: 0 additions & 8 deletions src/datachain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,6 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
)
add_sources_arg(parse_index)

subp.add_parser(
"find-stale-storages",
parents=[parent_parser],
description="Finds and marks stale storages",
)

show_parser = subp.add_parser(
"show",
parents=[parent_parser],
Expand Down Expand Up @@ -1100,8 +1094,6 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09
)
elif args.command == "completion":
print(completion(args.shell))
elif args.command == "find-stale-storages":
catalog.find_stale_storages()
elif args.command == "query":
query(
catalog,
Expand Down
15 changes: 10 additions & 5 deletions src/datachain/client/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
from datachain.lib.file import File
from datachain.nodes_fetcher import NodesFetcher
from datachain.nodes_thread_pool import NodeChunk
from datachain.storage import StorageURI

if TYPE_CHECKING:
from fsspec.spec import AbstractFileSystem

from datachain.dataset import StorageURI


logger = logging.getLogger("datachain")

Expand Down Expand Up @@ -63,7 +64,7 @@ def _is_win_local_path(uri: str) -> bool:

class Bucket(NamedTuple):
name: str
uri: StorageURI
uri: "StorageURI"
created: Optional[datetime]


Expand Down Expand Up @@ -115,7 +116,7 @@ def is_data_source_uri(name: str) -> bool:
return DATA_SOURCE_URI_PATTERN.match(name) is not None

@staticmethod
def parse_url(source: str) -> tuple[StorageURI, str]:
def parse_url(source: str) -> tuple["StorageURI", str]:
cls = Client.get_implementation(source)
storage_name, rel_path = cls.split_url(source)
return cls.get_uri(storage_name), rel_path
Expand Down Expand Up @@ -148,14 +149,16 @@ def from_name(
@classmethod
def from_source(
cls,
uri: StorageURI,
uri: "StorageURI",
cache: DataChainCache,
**kwargs,
) -> "Client":
return cls(cls.FS_CLASS._strip_protocol(uri), kwargs, cache)

@classmethod
def ls_buckets(cls, **kwargs) -> Iterator[Bucket]:
from datachain.dataset import StorageURI

for entry in cls.create_fs(**kwargs).ls(cls.PREFIX, detail=True):
name = entry["name"].rstrip("/")
yield Bucket(
Expand All @@ -169,7 +172,9 @@ def is_root_url(cls, url) -> bool:
return url == cls.PREFIX

@classmethod
def get_uri(cls, name) -> StorageURI:
def get_uri(cls, name) -> "StorageURI":
from datachain.dataset import StorageURI

return StorageURI(f"{cls.PREFIX}{name}")

@classmethod
Expand Down
10 changes: 7 additions & 3 deletions src/datachain/client/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
import posixpath
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from fsspec.implementations.local import LocalFileSystem

from datachain.lib.file import File
from datachain.storage import StorageURI

from .fsspec import Client

if TYPE_CHECKING:
from datachain.dataset import StorageURI


class FileClient(Client):
FS_CLASS = LocalFileSystem
Expand All @@ -28,7 +30,9 @@ def url(self, path: str, expires: int = 3600, **kwargs) -> str:
raise TypeError("Signed urls are not implemented for local file system")

@classmethod
def get_uri(cls, name) -> StorageURI:
def get_uri(cls, name) -> "StorageURI":
from datachain.dataset import StorageURI

return StorageURI(f'{cls.PREFIX}/{name.removeprefix("/")}')

@classmethod
Expand Down
Loading

0 comments on commit a516c94

Please sign in to comment.