Skip to content

Commit

Permalink
Truncate the table if the size exceeds more then 2GB
Browse files Browse the repository at this point in the history
  • Loading branch information
raghumdani committed Jan 14, 2025
1 parent 3d9ae37 commit 2ed1a10
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
10 changes: 7 additions & 3 deletions deltacat/compute/compactor_v2/utils/primary_key_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ def _optimized_group_record_batches_by_hash_bucket(
record_batches = []
result_len = 0
for record_batch in table_batches:
current_bytes += record_batch.nbytes
record_batches.append(record_batch)
if current_bytes >= MAX_SIZE_OF_RECORD_BATCH_IN_GIB:
if (
record_batches
and current_bytes + record_batch.nbytes >= MAX_SIZE_OF_RECORD_BATCH_IN_GIB
):
logger.info(
f"Total number of record batches without exceeding {MAX_SIZE_OF_RECORD_BATCH_IN_GIB} "
f"is {len(record_batches)} and size {current_bytes}"
Expand All @@ -128,6 +129,9 @@ def _optimized_group_record_batches_by_hash_bucket(
current_bytes = 0
record_batches.clear()

current_bytes += record_batch.nbytes
record_batches.append(record_batch)

if record_batches:
appended_len, append_latency = timed_invocation(
_append_table_by_hash_bucket,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pyarrow as pa
from deltacat.compute.compactor_v2.utils.primary_key_index import (
group_by_pk_hash_bucket,
)


class TestGroupByPkHashBucket:
def test_sanity(self):
record = pa.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
pk = pa.array(["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"])
record_batch = pa.RecordBatch.from_arrays([record, pk], names=["record", "pk"])
table = pa.Table.from_batches([record_batch])
grouped_array = group_by_pk_hash_bucket(table, 3, ["pk"])

assert len(grouped_array) == 3
total_records = 0
for arr in grouped_array:
if arr is not None:
total_records += len(arr[1])

assert total_records == len(table)

def test_when_record_batches_exceed_int_max_size(self):
record = pa.array(["12bytestring" * 90_000_000])
record_batch = pa.RecordBatch.from_arrays([record], names=["pk"])
table = pa.Table.from_batches([record_batch, record_batch])

grouped_array = group_by_pk_hash_bucket(table, 3, ["pk"])

assert len(grouped_array) == 3
assert len(grouped_array[2].to_batches()) == 2 # two record batches preserved

def test_when_record_batches_less_than_int_max_size(self):
record = pa.array(["12bytestring" * 90_000])
record_batch = pa.RecordBatch.from_arrays([record], names=["pk"])
table = pa.Table.from_batches([record_batch, record_batch])

grouped_array = group_by_pk_hash_bucket(table, 3, ["pk"])

assert len(grouped_array) == 3
assert len(grouped_array[1].to_batches()) == 1 # truncated to one record batch

0 comments on commit 2ed1a10

Please sign in to comment.