Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Chunk data to put more than 25 elements at once in DynamoDB #460

Merged
merged 2 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions RAGchain/utils/linker/dynamo_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,31 @@ def flush_db(self):

def put_json(self, ids: List[Union[UUID, str]], json_data_list: List[dict]):
assert len(ids) == len(json_data_list), "ids and json_data_list must have the same length"
items = [{
'PutRequest': {
'Item': {
'id': str(_id),
'data': json_data
for id_chunk, data_chunk in zip(self.chunk(ids, 25), self.chunk(json_data_list, 25)):
items = [{
'PutRequest': {
'Item': {
'id': str(_id),
'data': json_data
}
}
}
} for _id, json_data in zip(ids, json_data_list)]
request_items = {self.table_name: items}
self.dynamodb.batch_write_item(RequestItems=request_items)
} for _id, json_data in zip(id_chunk, data_chunk)]
request_items = {self.table_name: items}
self.dynamodb.batch_write_item(RequestItems=request_items)

def delete_json(self, ids: List[Union[UUID, str]]):
str_ids = [str(_id) for _id in ids]
items = [{
'DeleteRequest': {
'Key': {
'id': _id
for id_chunk in self.chunk(ids, 25):
items = [{
'DeleteRequest': {
'Key': {
'id': str(_id)
}
}
}
} for _id in str_ids]
request_items = {self.table_name: items}
self.dynamodb.batch_write_item(RequestItems=request_items)
} for _id in id_chunk]
request_items = {self.table_name: items}
self.dynamodb.batch_write_item(RequestItems=request_items)

@staticmethod
def chunk(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
11 changes: 11 additions & 0 deletions tests/RAGchain/utils/linker/test_base_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
LONG_DB_ORIGIN = [TEST_DB_ORIGIN[0], TEST_DB_ORIGIN[1], TEST_DB_ORIGIN[2], TEST_DB_ORIGIN[0], TEST_DB_ORIGIN[1],
TEST_DB_ORIGIN[2], None, TEST_DB_ORIGIN[1]]

LONG_26_TEST_IDS = [uuid4() for _ in range(26)]
LONG_26_DB_ORIGIN = [TEST_DB_ORIGIN[0] for _ in range(26)]


def test_singleton_same_child():
with pytest.raises(SingletonCreationError) as e:
Expand Down Expand Up @@ -159,3 +162,11 @@ def delete_test(linker):
linker.delete_json(['test_id2', 'test_id4'])
new_data = linker.get_json(test_id_list)
assert new_data == [db_origin_list[0], None, db_origin_list[2], None]


def long_26_test(linker):
linker.put_json(LONG_26_TEST_IDS, LONG_26_DB_ORIGIN)
assert linker.get_json(LONG_26_TEST_IDS) == LONG_26_DB_ORIGIN
linker.delete_json(LONG_26_TEST_IDS)
with pytest.warns(NoIdWarning) as record:
assert linker.get_json(LONG_26_TEST_IDS) == [None for _ in range(26)]
4 changes: 4 additions & 0 deletions tests/RAGchain/utils/linker/test_dynamo_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ def test_delete(dynamo_db):

def test_long(dynamo_db):
test_base_linker.long_test(dynamo_db)


def test_long_26(dynamo_db):
test_base_linker.long_26_test(dynamo_db)
4 changes: 4 additions & 0 deletions tests/RAGchain/utils/linker/test_json_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ def test_delete(json_linker):

def test_long(json_linker):
test_base_linker.long_test(json_linker)


def test_long_26(json_linker):
test_base_linker.long_26_test(json_linker)
4 changes: 4 additions & 0 deletions tests/RAGchain/utils/linker/test_redis_linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ def test_delete(redis_db):

def test_long(redis_db):
test_base_linker.long_test(redis_db)


def test_long_26(redis_db):
test_base_linker.long_26_test(redis_db)