Skip to content

Commit

Permalink
Merge pull request #2966 from activeloopai/build_backend
Browse files Browse the repository at this point in the history
change build backend
  • Loading branch information
activesoull authored Sep 27, 2024
2 parents 5ecc63c + e3f11a8 commit 47e9f45
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 142 deletions.
66 changes: 30 additions & 36 deletions deeplake/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
LockedException,
BadRequestException,
RenameError,
InvalidPandasDataframeError,
)
from deeplake.util.storage import (
get_storage_and_cache_chain,
Expand All @@ -82,6 +83,17 @@
from deeplake.util.cache_chain import generate_chain
from deeplake.core.storage.deeplake_memory_object import DeepLakeMemoryObject

allow_delete_error_message = "Dataset overwrite failed. The dataset is marked as allow_delete=false. To allow overwrite, you must first run `allow_delete = True` on the dataset."
see_traceback_error_message = (
"Dataset overwrite failed. See traceback for more information."
)

dataset_corrupted_error_message = (
"The source dataset is corrupted."
"You can try to fix this by loading the dataset with `reset=True` "
"which will attempt to reset uncommitted HEAD changes and load the previous version."
)


def _check_indra_and_read_only_flags(indra: bool, read_only: Optional[bool]):
if indra == False:
Expand Down Expand Up @@ -258,7 +270,7 @@ def init(
db_engine = parse_runtime_parameters(path, runtime)["tensor_db"]

try:
storage, cache_chain = get_storage_and_cache_chain(
_, cache_chain = get_storage_and_cache_chain(
path=path,
db_engine=db_engine,
read_only=read_only,
Expand All @@ -279,16 +291,12 @@ def init(
if ds_exists:
if overwrite:
if not dataset._allow_delete(cache_chain):
raise DatasetHandlerError(
"Dataset overwrite failed. The dataset is marked as allow_delete=false. To allow overwrite, you must first run `allow_delete = True` on the dataset."
)
raise DatasetHandlerError(allow_delete_error_message)

try:
cache_chain.clear()
except Exception as e:
raise DatasetHandlerError(
"Dataset overwrite failed. See traceback for more information."
) from e
raise DatasetHandlerError(see_traceback_error_message) from e
create = True
else:
create = False
Expand Down Expand Up @@ -403,7 +411,7 @@ def exists(
_fetch_creds_from_key(creds, org_id, token)

try:
storage, cache_chain = get_storage_and_cache_chain(
storage, _ = get_storage_and_cache_chain(
path=path,
read_only=True,
creds=creds,
Expand Down Expand Up @@ -523,16 +531,12 @@ def empty(

if overwrite and dataset_exists(cache_chain):
if not dataset._allow_delete(cache_chain):
raise DatasetHandlerError(
"Dataset overwrite failed. The dataset is marked as allow_delete=false. To allow overwrite, you must first run `allow_delete = True` on the dataset."
)
raise DatasetHandlerError(allow_delete_error_message)

try:
cache_chain.clear()
except Exception as e:
raise DatasetHandlerError(
"Dataset overwrite failed. See traceback for more information."
) from e
raise DatasetHandlerError(see_traceback_error_message) from e
elif dataset_exists(cache_chain):
raise DatasetHandlerError(
f"A dataset already exists at the given path ({path}). If you want to create"
Expand Down Expand Up @@ -1143,11 +1147,11 @@ def _like( # (No reporting)
if dest_path == src_path:
# load tensor data to memory before deleting
# in case of in-place deeplake.like
meta = source_tensor.meta
info = source_tensor.info
sample_shape_tensor = source_tensor._sample_shape_tensor
sample_id_tensor = source_tensor._sample_id_tensor
sample_info_tensor = source_tensor._sample_info_tensor
_ = source_tensor.meta
_ = source_tensor.info
_ = source_tensor._sample_shape_tensor
_ = source_tensor._sample_id_tensor
_ = source_tensor._sample_info_tensor
destination_ds.delete_tensor(tensor_name)
destination_ds.create_tensor_like(tensor_name, source_tensor, unlink=tensor_name in unlink) # type: ignore

Expand Down Expand Up @@ -1218,9 +1222,7 @@ def copy(
)
except DatasetCorruptError as e:
raise DatasetCorruptError(
"The source dataset is corrupted.",
"You can try to fix this by loading the dataset with `reset=True` "
"which will attempt to reset uncommitted HEAD changes and load the previous version.",
dataset_corrupted_error_message,
e.__cause__,
)
else:
Expand Down Expand Up @@ -1323,9 +1325,7 @@ def deepcopy(
)
except DatasetCorruptError as e:
raise DatasetCorruptError(
"The source dataset is corrupted.",
"You can try to fix this by loading the dataset with `reset=True` "
"which will attempt to reset uncommitted HEAD changes and load the previous version.",
dataset_corrupted_error_message,
e.__cause__,
)
else:
Expand Down Expand Up @@ -1364,16 +1364,12 @@ def deepcopy(
if dataset_exists(cache_chain):
if overwrite:
if not dataset._allow_delete(cache_chain):
raise DatasetHandlerError(
"Dataset overwrite failed. The dataset is marked as allow_delete=false. To allow overwrite, you must first run `allow_delete = True` on the dataset."
)
raise DatasetHandlerError(allow_delete_error_message)

try:
cache_chain.clear()
except Exception as e:
raise DatasetHandlerError(
"Dataset overwrite failed. See traceback for more information."
) from e
raise DatasetHandlerError(see_traceback_error_message) from e
else:
raise DatasetHandlerError(
f"A dataset already exists at the given path ({dest}). If you want to copy to a new dataset, either specify another path or use overwrite=True."
Expand Down Expand Up @@ -2125,7 +2121,7 @@ def ingest_dataframe(
Dataset: New dataset created from the dataframe.
Raises:
Exception: If ``src`` is not a valid pandas dataframe object.
InvalidPandasDataframeError: If ``src`` is not a valid pandas dataframe object.
"""
import pandas as pd
from deeplake.auto.structured.dataframe import DataFrame
Expand All @@ -2138,7 +2134,7 @@ def ingest_dataframe(
)

if not isinstance(src, pd.DataFrame):
raise Exception("Source provided is not a valid pandas dataframe object")
raise InvalidPandasDataframeError()

structured = DataFrame(src, column_params, src_creds, creds_key)

Expand Down Expand Up @@ -2244,9 +2240,7 @@ def export_yolo(
)
except DatasetCorruptError as e:
raise DatasetCorruptError(
"The source dataset is corrupted.",
"You can try to fix this by loading the dataset with `reset=True` "
"which will attempt to reset uncommitted HEAD changes and load the previous version.",
dataset_corrupted_error_message,
e.__cause__,
)
else:
Expand Down
6 changes: 3 additions & 3 deletions deeplake/auto/structured/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from deeplake import Dataset
from deeplake import read, link
from deeplake.htype import HTYPE_SUPPORTED_COMPRESSIONS
from deeplake.util.exceptions import IngestionError
from deeplake.util.exceptions import IngestionError, InvalidPandasDataframeError
from deeplake.util.dataset import sanitize_tensor_name

from collections import defaultdict
Expand All @@ -26,14 +26,14 @@ def __init__(self, source, column_params=None, creds=None, creds_key=None):
Raises:
Exception: If source is not a pandas dataframe object.
InvalidPandasDataframeError: If source is not a pandas dataframe object.
"""

import pandas as pd # type: ignore

super().__init__(source)
if not isinstance(self.source, pd.DataFrame):
raise Exception("Source is not a pandas dataframe object.")
raise InvalidPandasDataframeError()

self.creds = creds
self.creds_key = creds_key
Expand Down
1 change: 1 addition & 0 deletions deeplake/auto/tests/test_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
SamePathException,
DatasetHandlerError,
IngestionError,
InvalidPandasDataframeError,
)
import numpy as np
import pytest
Expand Down
12 changes: 0 additions & 12 deletions deeplake/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@
BEST_RECALL = "best_recall@10"


def remove_username_from_config():
try:
config = {}
with open(REPORTING_CONFIG_FILE_PATH, "r") as f:
config = json.load(f)
config["username"] = "public"
with open(REPORTING_CONFIG_FILE_PATH, "w") as f:
json.dump(config, f)
except (FileNotFoundError, KeyError):
return


def check_response_status(response: requests.Response):
"""Check response status and throw corresponding exception on failure."""
code = response.status_code
Expand Down
13 changes: 6 additions & 7 deletions deeplake/core/chunk/chunk_compressed_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,12 @@ def extend_if_has_space_byte_compression_numpy(
cast = False
sample_nbytes = sample.nbytes
else:
if sample.size:
if not np.can_cast(sample_dtype, chunk_dtype):
raise TensorDtypeMismatchError(
chunk_dtype,
sample_dtype,
self.htype,
)
if sample.size and not np.can_cast(sample_dtype, chunk_dtype):
raise TensorDtypeMismatchError(
chunk_dtype,
sample_dtype,
self.htype,
)
cast = True
sample_nbytes = np.dtype(chunk_dtype).itemsize * sample.size
min_chunk_size = self.min_chunk_size
Expand Down
20 changes: 9 additions & 11 deletions deeplake/core/chunk/uncompressed_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ def _extend_if_has_space_text(
num_data_bytes = self.num_data_bytes
space_left = min_chunk_size - num_data_bytes
idx = np.searchsorted(csum, space_left)
if not idx and csum[0] > space_left:
if self._data_bytes:
return 0
if not idx and csum[0] > space_left and self._data_bytes:
return 0
num_samples = int(min(len(incoming_samples), idx + 1)) # type: ignore
bts = list(
map(self._text_sample_to_byte_string, incoming_samples[:num_samples])
Expand Down Expand Up @@ -131,13 +130,12 @@ def _extend_if_has_space_numpy(
chunk_dtype = self.dtype
samples_dtype = incoming_samples.dtype
if samples_dtype != chunk_dtype:
if size:
if not np.can_cast(samples_dtype, chunk_dtype):
raise TensorDtypeMismatchError(
chunk_dtype,
samples_dtype,
self.htype,
)
if size and not np.can_cast(samples_dtype, chunk_dtype):
raise TensorDtypeMismatchError(
chunk_dtype,
samples_dtype,
self.htype,
)
samples = samples.astype(chunk_dtype)
self._data_bytes += samples.tobytes() # type: ignore
self.register_in_meta_and_headers(
Expand All @@ -163,7 +161,7 @@ def _extend_if_has_space_list(
if shape is not None and not self.tensor_meta.is_link:
self.num_dims = self.num_dims or len(shape)
check_sample_shape(shape, self.num_dims)
except Exception as e:
except Exception:
if ignore_errors:
skipped.append(i)
continue
Expand Down
72 changes: 35 additions & 37 deletions deeplake/core/chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,26 +838,27 @@ def _samples_to_chunks(
incoming_num_samples = len(samples)
enc_ids: List[Optional[str]] = []
enc_count = [0]
if extending:
if self.tensor_meta.htype == "text" and (
self.chunk_class != SampleCompressedChunk
):
lengths = np.zeros(len(samples), dtype=np.uint32)
for i, s in enumerate(samples):
try:
s = s.numpy()
except AttributeError:
pass
if (
extending
and self.tensor_meta.htype == "text"
and (self.chunk_class != SampleCompressedChunk)
):
lengths = np.zeros(len(samples), dtype=np.uint32)
for i, s in enumerate(samples):
try:
s = s.numpy()
except AttributeError:
pass
try:
if s.dtype.name[:3] == "str":
lengths[i] = len(str(s.reshape(())))
except AttributeError:
try:
if s.dtype.name[:3] == "str":
lengths[i] = len(str(s.reshape(())))
except AttributeError:
try:
lengths[i] = s.__len__()
except AttributeError: # None
lengths[i] = 0
except TypeError: # Numpy scalar str
lengths[i] = str(s).__len__()
lengths[i] = s.__len__()
except AttributeError: # None
lengths[i] = 0
except TypeError: # Numpy scalar str
lengths[i] = str(s).__len__()
extra_args = {"lengths": lengths}
current_chunk = start_chunk
updated_chunks: List[Optional[str]] = []
Expand Down Expand Up @@ -1039,12 +1040,11 @@ def _handle_tiled_sample(
lengths,
):
sample = samples[0]
if sample.is_first_write:
if register:
if start_chunk_row is not None:
enc.register_samples(1)
else:
enc_count[-1] += 1
if sample.is_first_write and register:
if start_chunk_row is not None:
enc.register_samples(1)
else:
enc_count[-1] += 1
if sample.is_last_write:
tiles[
incoming_num_samples - len(samples) + bool(register) * orig_meta_length
Expand Down Expand Up @@ -1775,7 +1775,7 @@ def _update_with_operator(
samples: Union[np.ndarray, Sequence[InputSample], InputSample],
operator: str,
):
"""Update data at `index` with the output of elem-wise operatorion with samples"""
"""Update data at `index` with the output of elem-wise operation with samples"""
try:
if isinstance(samples, deeplake.core.tensor.Tensor):
samples = samples.numpy()
Expand Down Expand Up @@ -1891,7 +1891,7 @@ def _get_full_chunk(self, index) -> bool:
"""
threshold = 10

if type(index.values[0].value) == slice:
if isinstance(index.values[0].value, slice):
start = index.values[0].value.start or 0
stop = index.values[0].value.stop or self.num_samples
step = index.values[0].value.step or 1
Expand Down Expand Up @@ -2218,11 +2218,10 @@ def load_chunks(
if exception:
raise exception
chunk, chunk_info = future.result()
if chunk:
if _get_nbytes(chunk) <= self.cache.cache_size:
self.cache._insert_in_cache(
self.get_chunk_key_for_id(chunk_info[0]), chunk
)
if chunk and _get_nbytes(chunk) <= self.cache.cache_size:
self.cache._insert_in_cache(
self.get_chunk_key_for_id(chunk_info[0]), chunk
)
yield chunk_info
else:
with ThreadPoolExecutor() as executor:
Expand All @@ -2232,11 +2231,10 @@ def load_chunks(
repeat(storages),
):
chunk, chunk_info = result
if chunk:
if _get_nbytes(chunk) <= self.cache.cache_size:
self.cache._insert_in_cache(
self.get_chunk_key_for_id(chunk_info[0]), chunk
)
if chunk and _get_nbytes(chunk) <= self.cache.cache_size:
self.cache._insert_in_cache(
self.get_chunk_key_for_id(chunk_info[0]), chunk
)
yield chunk_info

def _get_samples(
Expand Down
Loading

0 comments on commit 47e9f45

Please sign in to comment.