Skip to content

Commit

Permalink
Add test for cleaning up object store between rounds
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Yan committed Oct 3, 2024
1 parent cc59747 commit 9a43890
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 13 deletions.
15 changes: 11 additions & 4 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _run_hash_and_merge(
previous_compacted_delta_manifest: Optional[Manifest],
compacted_partition: Partition,
) -> List[MergeResult]:
created_object_refs = set()
created_obj_ids = set()
telemetry_time_hb = 0
total_input_records_count = np.int64(0)
total_hb_record_count = np.int64(0)
Expand Down Expand Up @@ -289,7 +289,7 @@ def _run_hash_and_merge(
hb_result.hash_bucket_group_to_obj_id_tuple
):
if object_id_size_tuple:
created_object_refs.add(object_id_size_tuple[0])
created_obj_ids.add(object_id_size_tuple[0])
all_hash_group_idx_to_obj_id[hash_group_index].append(
object_id_size_tuple[0],
)
Expand Down Expand Up @@ -368,8 +368,15 @@ def _run_hash_and_merge(
telemetry_this_round + previous_telemetry
)
if params.num_rounds > 1:
logger.info(f"Detected {len(created_object_refs)} objects to be deleted...")
params.object_store.delete_many(list(created_object_refs))
logger.info(
f"Detected number of rounds to be {params.num_rounds}, "
f"preparing to delete {len(created_obj_ids)} objects from object store..."
)
params.object_store.delete_many(list(created_obj_ids))
else:
logger.info(
f"Detected number of rounds to be {params.num_rounds}, not cleaning up object store..."
)

return merge_results

Expand Down
2 changes: 1 addition & 1 deletion deltacat/io/file_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
start = time.monotonic()
num_deleted = 0
for ref in refs:
Expand Down
6 changes: 3 additions & 3 deletions deltacat/io/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
or may not return ordered results.
"""

def delete(self, obj: object, *args, **kwargs) -> bool:
def delete(self, ref: Any, *args, **kwargs) -> bool:
"""
Delete a single object from the object store.
"""
return self.delete_many([obj])
return self.delete_many([ref])

def delete_many(self, objects: List[object], *args, **kwargs) -> bool:
def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
...

"""
Expand Down
4 changes: 0 additions & 4 deletions deltacat/io/ray_plasma_object_store.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import logging
import ray
from ray import cloudpickle
from deltacat import logs
from deltacat.io.object_store import IObjectStore
from typing import Any, List
from ray.types import ObjectRef

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


class RayPlasmaObjectStore(IObjectStore):
"""
Expand Down
2 changes: 1 addition & 1 deletion deltacat/io/s3_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
start = time.monotonic()
num_deleted = 0
for ref in refs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination(
)

execute_compaction_result_spy = mocker.spy(ExecutionCompactionResult, "__init__")
object_store_delete_many_spy = mocker.spy(RayPlasmaObjectStore, "delete_many")

# execute
rcf_file_s3_uri = benchmark(compact_partition_func, compact_partition_params)
Expand Down Expand Up @@ -327,4 +328,5 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination(
if assert_compaction_audit:
if not assert_compaction_audit(compactor_version, compaction_audit):
assert False, "Compaction audit assertion failed"
assert object_store_delete_many_spy.call_count, "Object store was never cleaned up!"
return

0 comments on commit 9a43890

Please sign in to comment.