From 7fa63d9fc9f12f35b29f222126f0392324ffe1f7 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Tue, 12 Dec 2023 18:34:34 +0900 Subject: [PATCH] fix: Add client API to get/put bytes type values --- src/etcetra/client.py | 926 +++++++++++++++++++++++++++++------------- 1 file changed, 646 insertions(+), 280 deletions(-) diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 7168cc5..b5cbaad 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -277,209 +277,12 @@ async def __aexit__(self, exc_type, exc, tb) -> Optional[bool]: return False -class EtcdRequestGenerator: - @classmethod - def put( - cls, key: str, value: Optional[str], - lease: Optional[int] = None, - ignore_value: bool = False, - ignore_lease: bool = False, - encoding='utf-8', - ): - # TODO: Implement prev_kv response - return rpc_pb2.PutRequest( - key=key.encode(encoding), - value=value.encode(encoding) if value else None, - lease=lease, ignore_lease=ignore_lease, ignore_value=ignore_value, - ) - - @classmethod - def get( - cls, key: str, - limit: Optional[str] = None, - max_create_revision: Optional[str] = None, - max_mod_revision: Optional[str] = None, - min_create_revision: Optional[str] = None, - min_mod_revision: Optional[str] = None, - revision: Optional[str] = None, - serializable: bool = True, - sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, - sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, - encoding='utf-8', - ): - return rpc_pb2.RangeRequest( - key=key.encode(encoding), - limit=limit, - max_create_revision=max_create_revision, - max_mod_revision=max_mod_revision, - min_create_revision=min_create_revision, - min_mod_revision=min_mod_revision, - revision=revision, - serializable=serializable, - sort_order=sort_order.value, - sort_target=sort_target.value, - ) - - @classmethod - def get_range( - cls, key: str, range_end: str, - limit: Optional[str] = None, - max_create_revision: Optional[str] = None, - max_mod_revision: Optional[str] = None, - min_create_revision: Optional[str] = None, - min_mod_revision: Optional[str] = None, - revision: Optional[str] = None, - serializable: bool = True, - sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, - sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, - encoding='utf-8', - ): - return rpc_pb2.RangeRequest( - key=key.encode(encoding), - range_end=range_end.encode(encoding), - limit=limit, - max_create_revision=max_create_revision, - max_mod_revision=max_mod_revision, - min_create_revision=min_create_revision, - min_mod_revision=min_mod_revision, - revision=revision, - serializable=serializable, - sort_order=sort_order.value, - sort_target=sort_target.value, - ) - - @classmethod - def get_prefix( - cls, key: str, - limit: Optional[str] = None, - max_create_revision: Optional[str] = None, - max_mod_revision: Optional[str] = None, - min_create_revision: Optional[str] = None, - min_mod_revision: Optional[str] = None, - revision: Optional[str] = None, - serializable: bool = True, - sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, - sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, - encoding='utf-8', - ): - encoded_key = key.encode(encoding) - if key[-1] == '/' and len(key) >= 2: - range_end = encoded_key[:-2] + bytes([encoded_key[-2] + 1]) + b'/' - else: - range_end = encoded_key[:-1] + bytes([encoded_key[-1] + 1]) - return rpc_pb2.RangeRequest( - key=encoded_key, - range_end=range_end, - limit=limit, - max_create_revision=max_create_revision, - max_mod_revision=max_mod_revision, - min_create_revision=min_create_revision, - min_mod_revision=min_mod_revision, - revision=revision, - serializable=serializable, - sort_order=sort_order.value, - sort_target=sort_target.value, - ) - - @classmethod - def delete( - cls, key: str, - encoding='utf-8', - ): - # TODO: Implement prev_kv response - return rpc_pb2.DeleteRangeRequest( - key=key.encode(encoding), - ) - - @classmethod - def delete_range( - cls, key: str, range_end: str, - encoding='utf-8', - ): - # TODO: Implement prev_kv response - return rpc_pb2.DeleteRangeRequest( - key=key.encode(encoding), - range_end=range_end.encode(encoding), - ) - - @classmethod - def delete_prefix( - cls, key: str, - encoding='utf-8', - ): - # TODO: Implement prev_kv response - encoded_key = key.encode(encoding) - if key[-1] == '/' and len(key) >= 2: - range_end = encoded_key[:-2] + bytes([encoded_key[-2] + 1]) + b'/' - else: - range_end = encoded_key[:-1] + bytes([encoded_key[-1] + 1]) - return rpc_pb2.DeleteRangeRequest( - key=encoded_key, - range_end=range_end, - ) - - @classmethod - def keys_range( - cls, key: str, range_end: str, - limit: Optional[str] = None, - max_create_revision: Optional[str] = None, - max_mod_revision: Optional[str] = None, - min_create_revision: Optional[str] = None, - min_mod_revision: Optional[str] = None, - revision: Optional[str] = None, - serializable: bool = True, - sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, - sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, - encoding='utf-8', - ): - return rpc_pb2.RangeRequest( - key=key.encode(encoding), - range_end=range_end.encode(encoding), - keys_only=True, - limit=limit, - max_create_revision=max_create_revision, - max_mod_revision=max_mod_revision, - min_create_revision=min_create_revision, - min_mod_revision=min_mod_revision, - revision=revision, - serializable=serializable, - sort_order=sort_order.value, - sort_target=sort_target.value, - ) - - @classmethod - def keys_prefix( - cls, key: str, - limit: Optional[str] = None, - max_create_revision: Optional[str] = None, - max_mod_revision: Optional[str] = None, - min_create_revision: Optional[str] = None, - min_mod_revision: Optional[str] = None, - revision: Optional[str] = None, - serializable: bool = True, - sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, - sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, - encoding='utf-8', - ): - encoded_key = key.encode(encoding) - if key[-1] == '/' and len(key) >= 2: - range_end = encoded_key[:-2] + bytes([encoded_key[-2] + 1]) + b'/' - else: - range_end = encoded_key[:-1] + bytes([encoded_key[-1] + 1]) - return rpc_pb2.RangeRequest( - key=encoded_key, - range_end=range_end, - keys_only=True, - limit=limit, - max_create_revision=max_create_revision, - max_mod_revision=max_mod_revision, - min_create_revision=min_create_revision, - min_mod_revision=min_mod_revision, - revision=revision, - serializable=serializable, - sort_order=sort_order.value, - sort_target=sort_target.value, - ) +def parse_range_end(key: str, encoding: str) -> bytes: + encoded_key = key.encode(encoding) + if key[-1] == '/' and len(key) >= 2: + return encoded_key[:-2] + bytes([encoded_key[-2] + 1]) + b'/' + else: + return encoded_key[:-1] + bytes([encoded_key[-1] + 1]) class EtcdCommunicator: @@ -522,15 +325,80 @@ async def _authenticate(self, username: str, password: str) -> str: ) return response.token + + @grpc_exception_handler + async def put_bytes( + self, + key: str, + value: Optional[bytes], + lease: Optional[int] = None, + prev_kv: bool = False, + encoding: Optional[str] = None, + *, + ignore_lease: bool = False, + ) -> Optional[str]: + """ + Puts given key into the key-value store. + The value should be `bytes`. + + Parameters + --------- + key + The key to put into the key-value store + value + The value to associate with the key in the key-value store. + If the value is `None`, `ignore_value` sets to `True`. + lease + The lease ID to associate with the key in the key-value store. Defaults to `None`. + `None` lease indicates no lease. + prev_kv + If this value is `True`, gets the previous value before changing it and returns it. + Defaults to `False`. + encoding + Character encoding type to encode/decode key. + Defaults to `utf-8`. + ignore_lease + Update the key without changing its current lease. + + Returns + ------- + value: Optional[str] + If `prev_kv` is set to `True` and previous value exists, returns previous value. + Otherwise it will just return `None`. + """ + if encoding is None: + encoding = self.encoding + ignore_value = value is None + if ignore_value: + value = b'' + stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.Put( + rpc_pb2.PutRequest( + key=key.encode(encoding), + value=value, + lease=lease, + ignore_lease=ignore_lease, + ignore_value=ignore_value, + ), + ) + if prev_kv and response.prev_kv is not None and response.prev_kv.value is not None: + return response.prev_kv.value + return None + @grpc_exception_handler async def put( - self, key: str, value: Optional[str], + self, + key: str, + value: Optional[str], lease: Optional[int] = None, prev_kv: bool = False, encoding: Optional[str] = None, + *, + ignore_lease: bool = False, ) -> Optional[str]: """ Puts given key into the key-value store. + The value should be `str`. Parameters --------- @@ -538,6 +406,7 @@ async def put( The key to put into the key-value store value The value to associate with the key in the key-value store. + If the value is `None`, `ignore_value` sets to `True`. lease The lease ID to associate with the key in the key-value store. Defaults to `None`. `None` lease indicates no lease. @@ -549,47 +418,304 @@ async def put( If this value is `None`, this method will use default encoding which is set when creating this instance. Defaults to `utf-8`. + ignore_lease + Update the key without changing its current lease. + + Returns + ------- + value: Optional[str] + If `prev_kv` is set to `True` and previous value exists, returns previous value. + Otherwise it will just return `None`. + """ + if encoding is None: + encoding = self.encoding + ignore_value = value is None + if ignore_value: + value = '' + stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.Put( + rpc_pb2.PutRequest( + key=key.encode(encoding), + value=value.encode(encoding) if value else None, + lease=lease, + ignore_lease=ignore_lease, + ignore_value=ignore_value, + ), + ) + if prev_kv and response.prev_kv is not None and response.prev_kv.value is not None: + return response.prev_kv.value.decode(encoding) + return None + + + @grpc_exception_handler + async def get_bytes( + self, + key: str, + max_create_revision: Optional[str] = None, + max_mod_revision: Optional[str] = None, + min_create_revision: Optional[str] = None, + min_mod_revision: Optional[str] = None, + revision: Optional[str] = None, + encoding: Optional[str] = None, + *, + limit: str | None = None, + serializable: bool = True, + sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, + sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, + ) -> Optional[str]: + """ + Gets `bytes` value associated with given key from the key-value store. + + Parameters + --------- + key + The key to look up. + max_create_revision + The upper bound for returned key create revisions; + all keys with greater create revisions will be filtered away. + max_mod_revision + The upper bound for returned key mod revisions; + all keys with greater mod revisions will be filtered away. + min_create_revision + The lower bound for returned key create revisions; + all keys with lesser create revisions will be filtered away. + min_mod_revision + The lower bound for returned key mod revisions; + all keys with lesser mod revisions will be filtered away. + revision + The point-in-time of the key-value store to use for the range. + If revision is less or equal to zero, the range is over the newest key-value store. + If the revision has been compacted, ErrCompacted is returned as a response. + encoding + Character encoding type to encode/decode key. + Defaults to `utf-8`. + limit + The maximum number of keys returned for the request. + When limit is set to "0", it is treated as no limit. + serializable + Sets the range request to use serializable member-local reads. + sort_order + The ordering for sorted requests. + sort_target + The key-value field to sort. + + Returns + ------- + value: Optional[str] + Returns value if given key exists. Otherwise it will return `None`. + """ + if encoding is None: + encoding = self.encoding + stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.Range( + rpc_pb2.RangeRequest( + key=key.encode(encoding), + limit=limit, + max_create_revision=max_create_revision, + max_mod_revision=max_mod_revision, + min_create_revision=min_create_revision, + min_mod_revision=min_mod_revision, + revision=revision, + serializable=serializable, + sort_order=sort_order.value, + sort_target=sort_target.value, + ), + ) + if len(response.kvs) > 0: + return response.kvs[0].value + else: + return None + + + @grpc_exception_handler + async def get( + self, + key: str, + max_create_revision: Optional[str] = None, + max_mod_revision: Optional[str] = None, + min_create_revision: Optional[str] = None, + min_mod_revision: Optional[str] = None, + revision: Optional[str] = None, + encoding: Optional[str] = None, + *, + limit: str | None = None, + serializable: bool = True, + sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, + sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, + ) -> Optional[str]: + """ + Gets value associated with given key from the key-value store. + + Parameters + --------- + key + The key to look up. + max_create_revision + The upper bound for returned key create revisions; + all keys with greater create revisions will be filtered away. + max_mod_revision + The upper bound for returned key mod revisions; + all keys with greater mod revisions will be filtered away. + min_create_revision + The lower bound for returned key create revisions; + all keys with lesser create revisions will be filtered away. + min_mod_revision + The lower bound for returned key mod revisions; + all keys with lesser mod revisions will be filtered away. + revision + The point-in-time of the key-value store to use for the range. + If revision is less or equal to zero, the range is over the newest key-value store. + If the revision has been compacted, ErrCompacted is returned as a response. + encoding + Character encoding type to encode/decode all types of byte based strings. + If this value is `None`, this method will use default encoding which is set when creating + this instance. + Defaults to `utf-8`. + limit + The maximum number of keys returned for the request. + When limit is set to "0", it is treated as no limit. + serializable + Sets the range request to use serializable member-local reads. + sort_order + The ordering for sorted requests. + sort_target + The key-value field to sort. + + Returns + ------- + value: Optional[str] + Returns value if given key exists. Otherwise it will return `None`. + """ + if encoding is None: + encoding = self.encoding + stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.Range( + rpc_pb2.RangeRequest( + key=key.encode(encoding), + limit=limit, + max_create_revision=max_create_revision, + max_mod_revision=max_mod_revision, + min_create_revision=min_create_revision, + min_mod_revision=min_mod_revision, + revision=revision, + serializable=serializable, + sort_order=sort_order.value, + sort_target=sort_target.value, + ), + ) + if len(response.kvs) > 0: + return response.kvs[0].value.decode(encoding) + else: + return None + + @grpc_exception_handler + async def get_prefix_bytes( + self, + key: str, + max_create_revision: Optional[str] = None, + max_mod_revision: Optional[str] = None, + min_create_revision: Optional[str] = None, + min_mod_revision: Optional[str] = None, + revision: Optional[str] = None, + sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, + sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, + encoding: Optional[str] = None, + *, + limit: str | None = None, + serializable: bool = True, + ) -> Mapping[str, str]: + """ + Gets the key-value in dictionary from the key-value store with given key prefix. + i.e. `get_prefix('/sorna/local')` call looks up all keys which has `/sorna/local` prefix. + + Parameters + --------- + key + The key prefix to look up. + max_create_revision + The upper bound for returned key create revisions; + all keys with greater create revisions will be filtered away. + max_mod_revision + The upper bound for returned key mod revisions; + all keys with greater mod revisions will be filtered away. + min_create_revision + The lower bound for returned key create revisions; + all keys with lesser create revisions will be filtered away. + min_mod_revision + The lower bound for returned key mod revisions; + all keys with lesser mod revisions will be filtered away. + revision + The point-in-time of the key-value store to use for the range. + If revision is less or equal to zero, the range is over the newest key-value store. + If the revision has been compacted, ErrCompacted is returned as a response. + sort_order + Sort order. Defaults to `RangeRequestSortOrder.NONE`. + sort_target + Sort target. Defaults to `RangeRequestSortTarget.KEY`. + encoding + Character encoding type to encode/decode all types of byte based strings. + If this value is `None`, this method will use default encoding which is set when creating + this instance. + Defaults to `utf-8`. + limit + The maximum number of keys returned for the request. + When limit is set to "0", it is treated as no limit. + serializable + Sets the range request to use serializable member-local reads. Returns ------- - value: Optional[str] - If `prev_kv` is set to `True` and previous value exists, returns previous value. - Otherwise it will just return `None`. + value: Mapping[str, str] + Returns dictionary with all key-values which matches given key prefix. """ if encoding is None: encoding = self.encoding - ignore_value = value is None - if ignore_value: - value = '' stub = rpc_pb2_grpc.KVStub(self.channel) - response = await stub.Put( - EtcdRequestGenerator.put( - key, value, - lease=lease, ignore_value=ignore_value, - encoding=encoding, + + response = await stub.Range( + rpc_pb2.RangeRequest( + key=key.encode(encoding), + range_end=parse_range_end(key, encoding), + limit=limit, + max_create_revision=max_create_revision, + max_mod_revision=max_mod_revision, + min_create_revision=min_create_revision, + min_mod_revision=min_mod_revision, + revision=revision, + serializable=serializable, + sort_order=sort_order.value, + sort_target=sort_target.value, ), ) - if prev_kv and response.prev_kv is not None and response.prev_kv.value is not None: - return response.prev_kv.value.decode(encoding) - return None + ret: MutableMapping[str, str] = OrderedDict() + for x in response.kvs: + ret[x.key.decode(encoding)] = x.value + return ret @grpc_exception_handler - async def get( + async def get_prefix( self, key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, + sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, + sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None, - ) -> Optional[str]: + *, + limit: str | None = None, + serializable: bool = True, + ) -> Mapping[str, str]: """ - Gets value associated with given key from the key-value store. + Gets the key-value in dictionary from the key-value store with given key prefix. + The scalar values of the dictionary will be parsed into `str`. + i.e. `get_prefix('/sorna/local')` call looks up all keys which has `/sorna/local` prefix. Parameters --------- key - The key to look up. + The key prefix to look up. max_create_revision The upper bound for returned key create revisions; all keys with greater create revisions will be filtered away. @@ -606,42 +732,56 @@ async def get( The point-in-time of the key-value store to use for the range. If revision is less or equal to zero, the range is over the newest key-value store. If the revision has been compacted, ErrCompacted is returned as a response. + sort_order + Sort order. Defaults to `RangeRequestSortOrder.NONE`. + sort_target + Sort target. Defaults to `RangeRequestSortTarget.KEY`. encoding Character encoding type to encode/decode all types of byte based strings. If this value is `None`, this method will use default encoding which is set when creating this instance. Defaults to `utf-8`. + limit + The maximum number of keys returned for the request. + When limit is set to "0", it is treated as no limit. + serializable + Sets the range request to use serializable member-local reads. Returns ------- - value: Optional[str] - Returns value if given key exists. Otherwise it will return `None`. + value: Mapping[str, str] + Returns dictionary with all key-values which matches given key prefix. """ if encoding is None: encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.Range( - EtcdRequestGenerator.get( - key, + rpc_pb2.RangeRequest( + key=key.encode(encoding), + range_end=parse_range_end(key, encoding), + limit=limit, max_create_revision=max_create_revision, max_mod_revision=max_mod_revision, min_create_revision=min_create_revision, min_mod_revision=min_mod_revision, revision=revision, - serializable=True, - sort_order=RangeRequestSortOrder.NONE, - sort_target=RangeRequestSortTarget.KEY, - encoding=encoding, + serializable=serializable, + sort_order=sort_order.value, + sort_target=sort_target.value, ), ) - if len(response.kvs) > 0: - return response.kvs[0].value.decode(encoding) - else: - return None + ret: MutableMapping[str, str] = OrderedDict() + for x in response.kvs: + ret[x.key.decode(encoding)] = x.value.decode(encoding) + return ret @grpc_exception_handler - async def get_prefix( - self, key: str, + async def get_range_bytes( + self, + key: str, + range_end: str, + limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, @@ -650,15 +790,18 @@ async def get_prefix( sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None, + *, + serializable: bool = True, ) -> Mapping[str, str]: """ - Gets the key-value in dictionary from the key-value store with given key prefix. - i.e. `get_prefix('/sorna/local')` call looks up all keys which has `/sorna/local` prefix. + Gets the key-value in dictionary from the key-value store with keys in [key, range_end) range. Parameters --------- key - The key prefix to look up. + Start of key range. + range_end + End of key range. max_create_revision The upper bound for returned key create revisions; all keys with greater create revisions will be filtered away. @@ -684,6 +827,8 @@ async def get_prefix( If this value is `None`, this method will use default encoding which is set when creating this instance. Defaults to `utf-8`. + serializable + Sets the range request to use serializable member-local reads. Returns ------- @@ -694,22 +839,23 @@ async def get_prefix( encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) response = await stub.Range( - EtcdRequestGenerator.get_prefix( - key, + rpc_pb2.RangeRequest( + key=key.encode(encoding), + range_end=range_end.encode(encoding), + limit=limit, max_create_revision=max_create_revision, max_mod_revision=max_mod_revision, min_create_revision=min_create_revision, min_mod_revision=min_mod_revision, revision=revision, - serializable=True, - sort_order=sort_order, - sort_target=sort_target, - encoding=encoding, + serializable=serializable, + sort_order=sort_order.value, + sort_target=sort_target.value, ), ) ret: MutableMapping[str, str] = OrderedDict() for x in response.kvs: - ret[x.key.decode(encoding)] = x.value.decode(encoding) + ret[x.key.decode(encoding)] = x.value return ret @grpc_exception_handler @@ -724,9 +870,12 @@ async def get_range( sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None, + *, + serializable: bool = True, ) -> Mapping[str, str]: """ Gets the key-value in dictionary from the key-value store with keys in [key, range_end) range. + The scalar values of the dictionary will be parsed into `str`. Parameters --------- @@ -759,6 +908,8 @@ async def get_range( If this value is `None`, this method will use default encoding which is set when creating this instance. Defaults to `utf-8`. + serializable + Sets the range request to use serializable member-local reads. Returns ------- @@ -769,18 +920,18 @@ async def get_range( encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) response = await stub.Range( - EtcdRequestGenerator.get_range( - key, range_end, + rpc_pb2.RangeRequest( + key=key.encode(encoding), + range_end=range_end.encode(encoding), limit=limit, max_create_revision=max_create_revision, max_mod_revision=max_mod_revision, min_create_revision=min_create_revision, min_mod_revision=min_mod_revision, revision=revision, - serializable=True, - sort_order=sort_order, - sort_target=sort_target, - encoding=encoding, + serializable=serializable, + sort_order=sort_order.value, + sort_target=sort_target.value, ), ) ret: MutableMapping[str, str] = OrderedDict() @@ -788,10 +939,52 @@ async def get_range( ret[x.key.decode(encoding)] = x.value.decode(encoding) return ret + @grpc_exception_handler + async def delete_bytes( + self, + key: str, + prev_kv: bool = False, + encoding: Optional[str] = None, + ) -> Optional[str]: + """ + Deletes the given key the key-value store. + A delete request increments the revision of the key-value store + and generates a delete event in the event history for every deleted key. + + Parameters + --------- + key + The key to delete. + prev_kv + If this value set to `True` and previous value with associated target key exists, + this method will return previous value. + encoding + Character encoding type to encode/decode key. + Defaults to `utf-8`. + + Returns + ------ + value: Optional[str] + If `prev_kv` is set to `True` and previous value exists, returns previous value. + Otherwise it will just return `None`. + """ + if encoding is None: + encoding = self.encoding + stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.DeleteRange( + rpc_pb2.DeleteRangeRequest(key=key.encode(encoding)), + ) + if prev_kv and len(response.prev_kvs) > 0: + return response.prev_kvs[0].value + else: + return None + @grpc_exception_handler async def delete( - self, key: str, - prev_kv: bool = False, encoding: Optional[str] = None, + self, + key: str, + prev_kv: bool = False, + encoding: Optional[str] = None, ) -> Optional[str]: """ Deletes the given key the key-value store. @@ -820,12 +1013,56 @@ async def delete( if encoding is None: encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) - response = await stub.DeleteRange(EtcdRequestGenerator.delete(key, encoding=encoding)) + response = await stub.DeleteRange( + rpc_pb2.DeleteRangeRequest(key=key.encode(encoding)), + ) if prev_kv and len(response.prev_kvs) > 0: return response.prev_kvs[0].value.decode(encoding) else: return None + @grpc_exception_handler + async def delete_prefix_bytes( + self, key: str, + prev_kv: bool = False, encoding: Optional[str] = None, + ) -> Optional[List[Optional[str]]]: + """ + Deletes keys with given prefix and its associated values from the key-value store. + A delete request increments the revision of the key-value store + and generates a delete event in the event history for every deleted key. + + Parameters + --------- + key + The key prefix to delete. + prev_kv + If this value set to `True` and previous value with associated target key exists, + this method will return previous value. + encoding + Character encoding type to encode/decode key. + Defaults to `utf-8`. + + Returns + ------ + values: Optional[List[Optional[str]]] + If `prev_kv` is set to `True` and previous value exists, returns previous value. + Otherwise it will just return `None`. + """ + if encoding is None: + encoding = self.encoding + stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.DeleteRange(rpc_pb2.DeleteRangeRequest( + key=key.encode(encoding), + range_end=parse_range_end(key, encoding), + )) + if prev_kv: + return [ + x.value.decode(encoding) if x.value is not None else None + for x in response.prev_kvs + ] + else: + return None + @grpc_exception_handler async def delete_prefix( self, key: str, @@ -858,7 +1095,10 @@ async def delete_prefix( if encoding is None: encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) - response = await stub.DeleteRange(EtcdRequestGenerator.delete_prefix(key, encoding=encoding)) + response = await stub.DeleteRange(rpc_pb2.DeleteRangeRequest( + key=key.encode(encoding), + range_end=parse_range_end(key, encoding), + )) if prev_kv: return [ x.value.decode(encoding) if x.value is not None else None @@ -867,6 +1107,49 @@ async def delete_prefix( else: return None + @grpc_exception_handler + async def delete_range_bytes( + self, key: str, range_end: str, + prev_kv: bool = False, encoding: Optional[str] = None, + ) -> Optional[List[Optional[str]]]: + """ + Deletes the given range from the key-value store. + A delete request increments the revision of the key-value store + and generates a delete event in the event history for every deleted key. + + Parameters + --------- + key + Start of key range. + range_end + End of key range. + prev_kv + If this value set to `True` and previous value with associated target key exists, + this method will return previous value. + encoding + Character encoding type to encode/decode key. + Defaults to `utf-8`. + + Returns + ------ + values: Optional[str] + If `prev_kv` is set to `True` and previous value exists, returns previous value. + Otherwise it will just return `None`. + """ + if encoding is None: + encoding = self.encoding + stub = rpc_pb2_grpc.KVStub(self.channel) + response = await stub.DeleteRange( + rpc_pb2.DeleteRangeRequest( + key=key.encode(encoding), + range_end=range_end.encode(encoding), + ), + ) + if prev_kv: + return [x.value for x in response.prev_kvs] + else: + return None + @grpc_exception_handler async def delete_range( self, key: str, range_end: str, @@ -902,7 +1185,11 @@ async def delete_range( encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) response = await stub.DeleteRange( - EtcdRequestGenerator.delete_range(key, range_end, encoding=encoding)) + rpc_pb2.DeleteRangeRequest( + key=key.encode(encoding), + range_end=range_end.encode(encoding), + ), + ) if prev_kv: return [ x.value.decode(encoding) if x.value is not None else None @@ -922,6 +1209,9 @@ async def keys_prefix( sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None, + *, + limit: str | None = None, + serializable: bool = True, ) -> List[str]: """ Gets the keys which has given prefix from the key-value store. @@ -955,6 +1245,11 @@ async def keys_prefix( If this value is `None`, this method will use default encoding which is set when creating this instance. Defaults to `utf-8`. + limit + The maximum number of keys returned for the request. + When limit is set to "0", it is treated as no limit. + serializable + Sets the range request to use serializable member-local reads. Returns ------- @@ -965,17 +1260,19 @@ async def keys_prefix( encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) response = await stub.Range( - EtcdRequestGenerator.keys_prefix( - key, + rpc_pb2.RangeRequest( + key=key.encode(encoding), + range_end=parse_range_end(key, encoding), + keys_only=True, + limit=limit, max_create_revision=max_create_revision, max_mod_revision=max_mod_revision, min_create_revision=min_create_revision, min_mod_revision=min_mod_revision, revision=revision, - serializable=True, - sort_order=sort_order, - sort_target=sort_target, - encoding=encoding, + serializable=serializable, + sort_order=sort_order.value, + sort_target=sort_target.value, ), ) return [x.key.decode(encoding) for x in response.kvs] @@ -1038,8 +1335,10 @@ async def keys_range( encoding = self.encoding stub = rpc_pb2_grpc.KVStub(self.channel) response = await stub.Range( - EtcdRequestGenerator.keys_range( - key, range_end, + rpc_pb2.RangeRequest( + key=key.encode(encoding), + range_end=range_end.encode(encoding), + keys_only=True, limit=limit, max_create_revision=max_create_revision, max_mod_revision=max_mod_revision, @@ -1047,9 +1346,8 @@ async def keys_range( min_mod_revision=min_mod_revision, revision=revision, serializable=serializable, - sort_order=sort_order, - sort_target=sort_target, - encoding=encoding, + sort_order=sort_order.value, + sort_target=sort_target.value, ), ) return [x.key.decode(encoding) for x in response.kvs] @@ -1441,7 +1739,7 @@ def __init__(self, channel: Channel, encoding: str = 'utf-8') -> None: self.success = EtcdTransactionAction(encoding=encoding) self.failure = EtcdTransactionAction(encoding=encoding) - async def execute( + async def execute_bytes( self, compares: List[rpc_pb2.Compare], # type: ignore encoding: Optional[str] = None, @@ -1467,6 +1765,50 @@ async def execute( stub = rpc_pb2_grpc.KVStub(self.channel) result = await stub.Txn(txn_request) + ret: TxnReturnValues = [] + for response in result.responses: + response_type = response.WhichOneof('response') + if response_type == 'response_put': + ret.append({ + "revision": response.response_put.header.revision, + }) + elif response_type == 'response_range': + ret.append({ + x.key.decode(encoding): x.value + for x in response.response_range.kvs + }) + elif response_type == 'response_delete_range': + ret.append(None) # TODO: Handle delete response + else: + ret.append(None) + return TxnReturnType(ret, result.succeeded) + + async def execute( + self, + compares: List[rpc_pb2.Compare], # type: ignore + encoding: Optional[str] = None, + ) -> TxnReturnType: + """ + Executes Txn and returns results whose values are parsed in `str`. + """ + if encoding is None: + encoding = self.encoding + txn_request = rpc_pb2.TxnRequest() + txn_request.compare.extend(compares) + for key in ('success', 'failure'): + requests: List[TransactionRequest] = getattr(self, key).requests + for request in requests: + rop = rpc_pb2.RequestOp() + if isinstance(request, PutRequestType): + rop.request_put.CopyFrom(request) + elif isinstance(request, RangeRequestType): + rop.request_range.CopyFrom(request) + elif isinstance(request, DeleteRangeRequestType): + rop.request_delete_range.CopyFrom(request) + getattr(txn_request, key).extend([rop]) + stub = rpc_pb2_grpc.KVStub(self.channel) + result = await stub.Txn(txn_request) + ret: TxnReturnValues = [] for response in result.responses: response_type = response.WhichOneof('response') @@ -1502,6 +1844,29 @@ def __init__(self, encoding: str = 'utf-8') -> None: def add_callback(self, cb: Optional[Callable[[bool], None]]) -> None: self.callback = cb + def put_bytes( + self, + key: str, + value: Optional[bytes], + lease: Optional[int] = None, + ignore_value: bool = False, + ignore_lease: bool = False, + encoding: Optional[str] = None, + ) -> None: + """ + Puts given key into the key-value store. + The value should be `bytes`. + """ + if encoding is None: + encoding = self.encoding + self.requests.append( + rpc_pb2.PutRequest( + key=key.encode(encoding), + value=value, + lease=lease, ignore_lease=ignore_lease, ignore_value=ignore_value, + ), + ) + def put( self, key: str, value: Optional[str], lease: Optional[int] = None, @@ -1511,19 +1876,21 @@ def put( ) -> None: """ Puts given key into the key-value store. + The value should be `str`. """ if encoding is None: encoding = self.encoding self.requests.append( - EtcdRequestGenerator.put( - key, value, - lease=lease, ignore_lease=ignore_lease, - ignore_value=ignore_value, encoding=encoding, + rpc_pb2.PutRequest( + key=key.encode(encoding), + value=value.encode(encoding) if value else None, + lease=lease, ignore_lease=ignore_lease, ignore_value=ignore_value, ), ) def get( - self, key: str, + self, + key: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, @@ -1541,8 +1908,8 @@ def get( if encoding is None: encoding = self.encoding self.requests.append( - EtcdRequestGenerator.get( - key, + rpc_pb2.RangeRequest( + key=key.encode(encoding), limit=limit, max_create_revision=max_create_revision, max_mod_revision=max_mod_revision, @@ -1550,9 +1917,8 @@ def get( min_mod_revision=min_mod_revision, revision=revision, serializable=serializable, - sort_order=sort_order, - sort_target=sort_target, - encoding=encoding, + sort_order=sort_order.value, + sort_target=sort_target.value, ), ) @@ -1564,7 +1930,7 @@ def delete(self, key: str, encoding: Optional[str] = None) -> None: """ if encoding is None: encoding = self.encoding - self.requests.append(EtcdRequestGenerator.delete(key, encoding=encoding)) + self.requests.append(rpc_pb2.DeleteRangeRequest(key=key.encode(encoding))) class EtcdLockManager: