From dc9cc19730f1c0da3cc06bc25a2c29ef0f082123 Mon Sep 17 00:00:00 2001 From: gentlegiantJGC Date: Sun, 3 Nov 2024 10:39:19 +0000 Subject: [PATCH] Added thread speed tests --- tests/test_db.py | 111 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/tests/test_db.py b/tests/test_db.py index dc06342..b87be2d 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -6,6 +6,8 @@ import glob import os import weakref +import time +from concurrent.futures import ThreadPoolExecutor num_keys = [struct.pack(" None: db.close() + def test_thread_write(self) -> None: + count = 10_000 + data = [ + [ + struct.pack(">i", i + j) + for j in range(count) + ] + for i in range(0, count * 10, count) + ] + m = {k: k for k in sum(data, [])} + + with TemporaryDirectory() as path: + t1 = -time.time() + db = LevelDB(path, True) + for l in data: + for v in l: + db.put(v, v) + db.close() + t1 += time.time() + + db = LevelDB(path) + m1 = dict(db) + db.close() + self.assertEqual(m, m1) + + with TemporaryDirectory() as path: + t2 = -time.time() + db = LevelDB(path, True) + + def add(values) -> None: + for v in values: + db.put(v, v) + + with ThreadPoolExecutor() as executor: + executor.map(add, data) + + db.close() + t2 += time.time() + + db = LevelDB(path) + m2 = dict(db) + db.close() + self.assertEqual(m, m2) + + self.assertLess(t2, t1) + + def test_thread_read(self) -> None: + count = 10_000 + data = [ + [ + struct.pack(">i", i + j) + for j in range(count) + ] + for i in range(0, count * 10, count) + ] + m = {k: k for k in sum(data, [])} + + with TemporaryDirectory() as path: + db = LevelDB(path, True) + + def add(values) -> None: + for v in values: + db.put(v, v) + + with ThreadPoolExecutor() as executor: + executor.map(add, data) + + db.close() + + # Validate the database + db = LevelDB(path) + m2 = dict(db) + db.close() + self.assertEqual(m, m2) + + # # Read serial + db = LevelDB(path) + m1 = {} + t1 = -time.time() + it = db.create_iterator() + it.seek_to_first() + while it.valid(): + m1[it.key()] = it.value() + it.next() + t1 += time.time() + db.close() + self.assertEqual(m, m1) + + # Read parallel + db = LevelDB(path) + m2 = {} + t2 = -time.time() + + def read(values) -> None: + it = db.create_iterator() + it.seek(values[0]) + for _ in range(count): + m2[it.key()] = it.value() + it.next() + + with ThreadPoolExecutor() as executor: + executor.map(read, data) + + t2 += time.time() + db.close() + self.assertEqual(m, m2) + + self.assertLess(t2, t1) + print(t1, t2) if __name__ == "__main__": unittest.main()