diff --git a/README.rst b/README.rst index 6c0d2a3..4f652b6 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,7 @@ BorgStore ========= -A key/value store implementation in Python, supporting multiple backends, -data redundancy and distribution. +A key/value store implementation in Python, supporting multiple backends. Keys ---- @@ -103,15 +102,6 @@ Currently, these storage backends are implemented: - Rclone - access any of the 100s of cloud providers [rclone](https://rclone.org/) supports - (more might come in future) -MStore ------- - -API of MStore is very similar to Store, but instead of directly using one backend -only (like Store does), it uses multiple Stores internally to implement: - -- redundancy (keep same data at multiple places) -- distribution (keep different data at multiple places) - Scalability ----------- diff --git a/src/borgstore/mstore.py b/src/borgstore/mstore.py deleted file mode 100644 index ebded16..0000000 --- a/src/borgstore/mstore.py +++ /dev/null @@ -1,204 +0,0 @@ -""" -Multi-Store Key/Value Implementation. - -Distributed: MStore can store into multiple stores (e.g. different directories on different disks, on diff. servers) - with different sizes. -Redundant: The same mechanism also implements simple redundancy (like storing same item N times). - -Similar to a hashtable, we use 256 buckets within the MStore and create a map mapping the bucket number to the Store(s) -it resides on. When storing an item, the key part of the name (namespace/key) is assumed to be a hex hash value and -the first 2 hex digits determine which bucket the data goes into (and thus: which Store(s) it is stored into). - -Examples: -MStore gets a list of stores and a list of related bucket counts. Bucket numbers are calculated modulo 256, so if -the total bucket count is more than 256 (like 512, 768, ...), stuff will get stored multiple times (usually into -different stores). -MStore([store0], [256]) - simplest configuration: store everything into store0 -MStore([st0, st1], [192, 64]) - JBOD-like: store 3/4 into st0 (bucket 0..191), 1/4 into st1 (bucket 192..255) -MStore([st0, st1], [256, 256]) - Mirror: store each item into st0 **and** into st1 (both have buckets 0..255) -MStore([st0, st1, st2], [256, 256, 256]) - store each item into st0, st1 **and** st2 -""" - -from collections import defaultdict -from typing import Iterator, Optional - -from .utils.nesting import split_key -from .store import Store, ItemInfo, ObjectNotFound - - -def create_bucket_map(buckets: list[int]) -> dict[int, list[int]]: - """ - use a list of bucket counts (of the stores) and create a lookup dictionary: - bucket (0..255) -> list of store indexes that store this bucket - """ - total = sum(buckets) - if total < 256: - raise ValueError("each of the 256 possible values must have at least one corresponding bucket") - if total % 256 != 0: - raise ValueError("all 256 values should be covered equally with buckets") - map = defaultdict(list) - base = 0 - for store_index, bucket_count in enumerate(buckets): - for offset in range(bucket_count): - bucket = (base + offset) % 256 - map[bucket].append(store_index) - base += bucket_count - return map - - -def lookup_stores(map: dict, bucket: int) -> list[int]: - """lookup the store index(es) for a specific bucket""" - if not isinstance(bucket, int): - raise TypeError("bucket must be an integer") - if bucket < 0 or bucket > 255: - raise ValueError("bucket must be between 0 and 255") - return map[bucket] - - -class MStore: - def __init__(self, stores: list[Store], buckets: list[int], kinds: Optional[dict] = None): - if not len(stores): - raise ValueError("stores list must not be empty") - if len(stores) != len(buckets): - raise ValueError("stores list and buckets count list must have same length") - self.stores = stores - self.buckets = buckets - self.all_stores = list(range(len(self.stores))) - self.map = create_bucket_map(buckets) - # kinds = prefix -> kind, kind can be "hex-hash", "generic". - kinds = kinds if kinds else {} - # we accept kinds as a dict, but we rather want a list of (prefix, kind) tuples, longest prefix first: - self.kinds = [entry for entry in sorted(kinds.items(), key=lambda item: len(item[0]), reverse=True)] - - def __repr__(self): - return f"" - - def create(self) -> None: - for store in self.stores: - store.create() - - def destroy(self) -> None: - for store in self.stores: - store.destroy() - - def __enter__(self): - self.open() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - return False - - def open(self) -> None: - for store in self.stores: - store.open() - - def close(self) -> None: - for store in self.stores: - store.close() - - def _get_kind(self, name): - """get kind of store from configuration depending on namespace""" - for prefix, kind in self.kinds: - if name.startswith(prefix): - return kind - return "generic" # "generic" is the default, if no prefix matched - - def _find_stores(self, name: str, mode: str = "r") -> list[int]: - kind = self._get_kind(name) - if kind == "hex-hash": - key = split_key(name)[1] # we do not care for the namespace part here - key_binary = bytes.fromhex(key) # and assume key is a good hash, represented as a hex str - bucket = key_binary[0] # use first 8bits of key to determine bucket (int) - w_stores = self.map[bucket] # list of store indexes (for writing) - if mode not in ["r", "w", "d", "m"]: - raise ValueError("mode must be either 'r', 'w', 'd' or 'm'.") - if mode == "w": - # for writing just return the stores currently configured - return w_stores - else: # mode == "r" or "d" or "m" - # for reading, return the stores currently configured *first*, - # but also add all other stores after these, so items can be found - # there while we redistribute them. - # for deleting, guess we also want to try deleting an item from all stores. - # for moving, guess we want to try to move an item in all stores. - fallback_r_stores = [idx for idx in self.all_stores if idx not in w_stores] - return w_stores + fallback_r_stores - elif kind == "generic": - # for generic storage, we store to ALL stores. - # usually this is important and small stuff, like configs, keys, ... - return self.all_stores - else: - raise NotImplementedError(f"kind '{kind}' is not implemented.") - - def info(self, name: str, *, deleted=False) -> ItemInfo: - for store_idx in self._find_stores(name, mode="r"): - store = self.stores[store_idx] - try: - return store.info(name, deleted=deleted) - except ObjectNotFound: - pass # TODO: we expected the key to be there, but it was not. fix that by storing it there. - else: - raise ObjectNotFound(name) # didn't find it in any store - - def load(self, name: str, *, size=None, offset=0, deleted=False) -> bytes: - for store_idx in self._find_stores(name, mode="r"): - store = self.stores[store_idx] - try: - return store.load(name, size=size, offset=offset, deleted=deleted) - except ObjectNotFound: - pass # TODO: we expected the key to be there, but it was not. fix that by storing it there. - else: - raise ObjectNotFound(name) # didn't find it in any store - - def store(self, name: str, value: bytes) -> None: - for store_idx in self._find_stores(name, mode="w"): - store = self.stores[store_idx] - store.store(name, value) - - def delete(self, name: str, *, deleted=False) -> None: - for store_idx in self._find_stores(name, mode="d"): - store = self.stores[store_idx] - try: - store.delete(name, deleted=deleted) - except ObjectNotFound: - pass # ignore it if it is already gone - - def move( - self, - name: str, - new_name: Optional[str] = None, - *, - delete: bool = False, - undelete: bool = False, - change_level: bool = False, - deleted: bool = False, - ) -> None: - for store_idx in self._find_stores(name, mode="m"): - store = self.stores[store_idx] - try: - if delete: - # use case: keep name, but soft "delete" the item - store.move(name, delete=True) - elif undelete: - # use case: keep name, undelete a previously soft "deleted" item - store.move(name, undelete=True) - elif change_level: - # use case: keep name, changing to another nesting level - store.move(name, change_level=True, deleted=deleted) - else: - # generic use (be careful!) - if not new_name: - raise ValueError("generic move needs new_name to be given.") - store.move(name, new_name, deleted=deleted) - except ObjectNotFound: - pass # ignore it, if it is not present in this store - - def list(self, name: str, deleted: bool = False) -> Iterator[ItemInfo]: - # when using multiple stores, the results yielded might be only partially sorted. - seen = set() - for store in self.stores: - for item_info in store.list(name, deleted=deleted): - if item_info.name not in seen: - yield item_info - seen.add(item_info.name) diff --git a/tests/test_mstore.py b/tests/test_mstore.py deleted file mode 100644 index 20d3257..0000000 --- a/tests/test_mstore.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -Testing for high-level MStore API. -""" - -import pytest - -from . import key, lkey, list_store_names, list_store_names_sorted - -from borgstore.backends.errors import ObjectNotFound -from borgstore.store import Store -from borgstore.mstore import create_bucket_map, lookup_stores, MStore - - -@pytest.mark.parametrize("buckets", [[], [0], [42], [300], [256, 23], [23, 256]]) -def test_bucket_map_invalid(buckets): - with pytest.raises(ValueError): - create_bucket_map(buckets) # does not cover 256 buckets exactly N times - - -@pytest.mark.parametrize( - "buckets, n_stores", - [ - ([256], 1), # single store having all buckets ("single disk") - ([128, 128], 1), # 2 stores each having half of the buckets ("raid0") - ([256, 256], 2), # 2 stores each having all the buckets ("raid1") - ([128, 128, 128, 128], 2), # 4 stores each having half of the buckets ("raid10") - ([256, 128, 128], 2), # one big store mirroring 2 smaller ones - ([200, 56], 1), # store 0 is bigger than store 1 ("jbod") - ([256, 256, 256], 3), # 3 stores each having all buckets ("3-disk mirror") - ], -) -def test_bucket_map_valid(buckets, n_stores): - # n_stores means an item is stored in n stores (1 = standard, 2+ = with redundancy) - map = create_bucket_map(buckets) - for bucket in range(256): - assert bucket in map # we want to map ALL the 256 buckets - stores = map[bucket] - assert len(stores) == n_stores # each bucket shall exist in N stores - assert len(set(stores)) == n_stores # each bucket shall exist in N *different* stores - - -@pytest.mark.parametrize( - "buckets,key,store", - [ - ([256], 0, [0]), - ([256], 255, [0]), - ([128, 128], 0, [0]), - ([128, 128], 127, [0]), - ([128, 128], 128, [1]), - ([128, 128], 255, [1]), - ([256, 256], 0, [0, 1]), - ([256, 256], 127, [0, 1]), - ([256, 256], 128, [0, 1]), - ([256, 256], 255, [0, 1]), - ], -) -def test_lookup_bucket(buckets, key, store): - map = create_bucket_map(buckets) - assert lookup_stores(map, key) == store - - -@pytest.fixture() -def mstore_jbod_created(tmp_path): - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1")] - mstore = MStore(stores=stores, buckets=[192, 64], kinds={"": "hex-hash"}) - mstore.create() - try: - yield mstore - finally: - mstore.destroy() - - -@pytest.fixture() -def mstore_mirror_created(tmp_path): - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1")] - mstore = MStore(stores=stores, buckets=[256, 256], kinds={"": "hex-hash"}) - mstore.create() - try: - yield mstore - finally: - mstore.destroy() - - -def fill_storage(store: MStore, count: int, *, start: int = 0) -> None: - for i in range(start, start + count, 1): - k, v = lkey(i), str(i).encode() - store.store(k, v) - - -def read_storage(store: MStore, count: int, *, start: int = 0) -> None: - # can we still read all data? - for i in range(start, start + count, 1): - k, v = lkey(i), str(i).encode() - assert store.load(k) == v - - -def test_list(mstore_mirror_created): - with mstore_mirror_created as mstore: - fill_storage(mstore, 1024) - # there must be no duplication of keys from the mirror mstore - assert list_store_names(mstore, "") == sorted([lkey(i) for i in range(1024)]) - - -def test_list(mstore_jbod_created): - with mstore_jbod_created as mstore: - fill_storage(mstore, 1024) - # check if we get all expected keys from the jbod mstore - assert list_store_names(mstore, "") == sorted([lkey(i) for i in range(1024)]) - - -def test_load_store_list_distribution(mstore_jbod_created): - with mstore_jbod_created as mstore: - fill_storage(mstore, 1024) - # check if all data is readable and as expected: - for i in range(1024): - k, v = lkey(i), str(i).encode() - assert mstore.load(k) == v - # check if data ended up in the stores according to the ratio configured in mstore_jbod (192 : 64) - keys_mstore = list_store_names(mstore, "") - keys_store0 = list_store_names(mstore.stores[0], "") - keys_store1 = list_store_names(mstore.stores[1], "") - assert len(keys_mstore) == len(set(keys_mstore)) == 1024 - assert len(keys_store0) == len(set(keys_store0)) == 768 - assert len(keys_store1) == len(set(keys_store1)) == 256 - - -def test_load_store_list_redundancy(mstore_mirror_created): - with mstore_mirror_created as mstore: - fill_storage(mstore, 1024) - # delete stuff from store 0: - for i in 0, 23, 42, 1001: - mstore.stores[0].delete(lkey(i)) - # check if it is really gone: - for i in 0, 23, 42, 1001: - with pytest.raises(ObjectNotFound): - mstore.stores[0].load(lkey(i)) - # delete other stuff from store 1: - for i in 123, 456, 789: - mstore.stores[1].delete(lkey(i)) - # check if it is really gone: - for i in 123, 456, 789: - with pytest.raises(ObjectNotFound): - mstore.stores[1].load(lkey(i)) - # check if we can still read everything from the mirror: - for i in range(1024): - k, v = lkey(i), str(i).encode() - assert mstore.load(k) == v - # also check if list still works ok: - assert list_store_names_sorted(mstore, "") == sorted([lkey(i) for i in range(1024)]) - # now delete some values also from the other side of the mirror: - for i in 0, 23, 42, 1001: - mstore.stores[1].delete(lkey(i)) - for i in 123, 456, 789: - mstore.stores[0].delete(lkey(i)) - # now the mirror is expected to be partially corrupted at these places: - for i in 0, 23, 42, 1001, 123, 456, 789: - with pytest.raises(ObjectNotFound): - mstore.load(lkey(i)) - # list is expected to miss some elements: - assert list_store_names(mstore, "") == sorted( - [lkey(i) for i in range(1024) if i not in [0, 23, 42, 1001, 123, 456, 789]] - ) - - -def test_move_delete_undelete(mstore_mirror_created): - k0, v0 = key(0), b"value0" - k1, v1 = key(1), b"value1" - with mstore_mirror_created as mstore: - mstore.store(k0, v0) - mstore.store(k1, v1) - # delete - mstore.move(k0, delete=True) # soft delete - assert list_store_names(mstore, "", deleted=False) == [k1] - assert list_store_names(mstore, "", deleted=True) == [k0, k1] - # undelete - mstore.move(k0, undelete=True) # undelete previously soft deleted item - assert list_store_names(mstore, "", deleted=False) == [k0, k1] - assert list_store_names(mstore, "", deleted=True) == [k0, k1] - - -def test_namespaces(mstore_jbod_created): - with mstore_jbod_created as mstore: - mstore.kinds = [("config/", "generic"), ("data/", "hex-hash")] - mstore.store("config/main", b"some config") - mstore.store("data/0000", b"value_00") - mstore.store("data/bf00", b"value_bf") - mstore.store("data/c000", b"value_c0") - mstore.store("data/ff00", b"value_ff") - # now let's check where stuff ended up being stored. - st0, st1 = mstore.stores - # hex-hash kind of data should be spread into buckets according to its hash: - assert st0.load("data/0000") == b"value_00" - assert st0.load("data/bf00") == b"value_bf" - with pytest.raises(ObjectNotFound): - st0.load("data/c000") - with pytest.raises(ObjectNotFound): - st0.load("data/ff00") - with pytest.raises(ObjectNotFound): - st1.load("data/0000") - with pytest.raises(ObjectNotFound): - st1.load("data/bf00") - assert st1.load("data/c000") == b"value_c0" - assert st1.load("data/ff00") == b"value_ff" - # generic kind config should be mirrored to all stores: - assert st0.load("config/main") == b"some config" - assert st1.load("config/main") == b"some config" - - -def test_reduce_prepare(tmp_path): - # assume we want to stop using a store, then: - # - we don't want to write new data to it - # - we want to be able to read all data from the mstore at all times - # - # test setup: we have 3 stores with data distributed over them: - entries = 1024 - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1"), Store(url=f"file://{tmp_path}2")] - mstore = MStore(stores=stores, buckets=[128, 64, 64], kinds={"": "hex-hash"}) - mstore.create() - with mstore: - fill_storage(mstore, entries) - read_storage(mstore, entries) - assert len(list_store_names(mstore.stores[0], "")) == 512 - assert len(list_store_names(mstore.stores[1], "")) == 256 - assert len(list_store_names(mstore.stores[2], "")) == 256 - # test: still have the 3 stores available, but bucket count 0 in store 2 means no new data will go into it: - stores = [Store(url=f"file://{tmp_path}0"), Store(url=f"file://{tmp_path}1"), Store(url=f"file://{tmp_path}2")] - mstore = MStore(stores=stores, buckets=[128, 128, 0], kinds={"": "hex-hash"}) - with mstore: - read_storage(mstore, entries) - # store new stuff into the mstore: - fill_storage(mstore, entries, start=entries) - read_storage(mstore, entries * 2) - assert len(list_store_names(mstore.stores[0], "")) == 512 + 512 - assert len(list_store_names(mstore.stores[1], "")) == 256 + 512 - assert len(list_store_names(mstore.stores[2], "")) == 256 # no new data was written to store 2 - mstore.destroy()