From 40b62f31b59378a15fa3d1aa7c214069b7e52b99 Mon Sep 17 00:00:00 2001 From: Moe Jafari Date: Mon, 30 Sep 2024 14:23:35 +0330 Subject: [PATCH] feat: Add delta compression support --- centrifuge/client.py | 24 +++ centrifuge/codecs.py | 26 ++- centrifuge/fossil.py | 282 ++++++++++++++++++++++++++++++ centrifuge/protocol/client_pb2.py | 152 ++++++++-------- centrifuge/types.py | 2 + client.proto | 3 + example.py | 1 + tests/test_fossil.py | 180 +++++++++++++++++++ 8 files changed, 593 insertions(+), 77 deletions(-) create mode 100644 centrifuge/fossil.py create mode 100644 tests/test_fossil.py diff --git a/centrifuge/client.py b/centrifuge/client.py index 409fed2..3c70f8d 100644 --- a/centrifuge/client.py +++ b/centrifuge/client.py @@ -15,6 +15,7 @@ Union, List, Callable, + Literal, ) import websockets @@ -201,6 +202,7 @@ def new_subscription( positioned: bool = False, recoverable: bool = False, join_leave: bool = False, + delta: Literal["fossil", ""] = "", ) -> "Subscription": """Creates new subscription to channel. If subscription already exists then DuplicateSubscriptionError exception will be raised. @@ -221,6 +223,7 @@ def new_subscription( positioned=positioned, recoverable=recoverable, join_leave=join_leave, + delta=delta, ) self._subs[channel] = sub return sub @@ -782,6 +785,9 @@ def _construct_subscribe_command(self, sub: "Subscription", cmd_id: int) -> Dict subscribe["epoch"] = sub._epoch subscribe["offset"] = sub._offset + if sub._delta: + subscribe["delta"] = sub._delta + command = { "id": cmd_id, "subscribe": subscribe, @@ -1311,6 +1317,7 @@ def _publication_from_proto(self, pub: Any) -> Publication: data=self._decode_data(pub.get("data")), info=client_info, tags=pub.get("tags", {}), + delta=pub.get("delta", False), ) @@ -1352,6 +1359,7 @@ def _initialize( positioned: bool = False, recoverable: bool = False, join_leave: bool = False, + delta: Literal["fossil", ""] = "", ) -> None: """Initializes Subscription instance. Note: use Client.new_subscription method to create new subscriptions in your app. @@ -1376,6 +1384,12 @@ def _initialize( self._recover: bool = False self._offset: int = 0 self._epoch: str = "" + self._prev_data: Optional[Any] = None + + if delta not in {"fossil", ""}: + raise CentrifugeError("unsupported delta format") + self._delta = delta + self._delta_negotiated: bool = False @classmethod def _create_instance(cls, *args: Any, **kwargs: Any) -> "Subscription": @@ -1552,6 +1566,8 @@ async def _move_subscribed(self, subscribe: Dict[str, Any]) -> None: lambda: asyncio.ensure_future(self._refresh(), loop=self._client._loop), ) + self._delta_negotiated = subscribe.get("delta", False) + await on_subscribed_handler( SubscribedContext( channel=self.channel, @@ -1603,6 +1619,14 @@ async def _resubscribe(self) -> None: async def _process_publication(self, pub: Any) -> None: publication = self._client._publication_from_proto(pub) + + if self._delta and self._delta_negotiated: + new_data, prev_data = self._client._codec.apply_delta_if_needed( + self._prev_data, publication + ) + publication.data = new_data + self._prev_data = prev_data + await self.events.on_publication(PublicationContext(pub=publication)) if publication.offset > 0: self._offset = publication.offset diff --git a/centrifuge/codecs.py b/centrifuge/codecs.py index f2739fe..5be6b15 100644 --- a/centrifuge/codecs.py +++ b/centrifuge/codecs.py @@ -1,10 +1,14 @@ import json -from typing import Union, Iterable, AsyncIterable +from typing import Union, Iterable, AsyncIterable, TYPE_CHECKING from google.protobuf.json_format import MessageToDict, ParseDict from websockets.typing import Data import centrifuge.protocol.client_pb2 as protocol +from centrifuge.fossil import apply_delta + +if TYPE_CHECKING: + from centrifuge import Publication class _JsonCodec: @@ -18,6 +22,16 @@ def encode_commands(commands): def decode_replies(data): return [json.loads(reply) for reply in data.strip().split("\n")] + @staticmethod + def apply_delta_if_needed(prev_data: bytes, pub: "Publication"): + if pub.delta: + prev_data = apply_delta(prev_data, pub.data.encode("utf-8")) + new_data = prev_data.decode("utf-8") + else: + prev_data = pub.data.encode("utf-8") + new_data = pub.data + return new_data, prev_data + def _varint_encode(number): """Encode an integer as a varint.""" @@ -73,3 +87,13 @@ def decode_replies(data: bytes): reply.ParseFromString(message_bytes) replies.append(MessageToDict(reply, preserving_proto_field_name=True)) return replies + + @staticmethod + def apply_delta_if_needed(prev_data: bytes, pub: "Publication"): + if pub.delta: + prev_data = apply_delta(prev_data, pub.data) + new_data = prev_data.decode("utf-8") + else: + prev_data = pub.data + new_data = pub.data.decode("utf-8") + return new_data, prev_data diff --git a/centrifuge/fossil.py b/centrifuge/fossil.py new file mode 100644 index 0000000..6752858 --- /dev/null +++ b/centrifuge/fossil.py @@ -0,0 +1,282 @@ +z_value = [ + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + -1, + -1, + -1, + -1, + -1, + -1, + -1, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + 32, + 33, + 34, + 35, + -1, + -1, + -1, + -1, + 36, + -1, + 37, + 38, + 39, + 40, + 41, + 42, + 43, + 44, + 45, + 46, + 47, + 48, + 49, + 50, + 51, + 52, + 53, + 54, + 55, + 56, + 57, + 58, + 59, + 60, + 61, + 62, + -1, + -1, + -1, + 63, + -1, +] + + +class Reader: + def __init__(self, array): + self.a = array # source array + self.pos = 0 # current position in array + + def have_bytes(self): + return self.pos < len(self.a) + + def get_byte(self): + if self.pos >= len(self.a): + raise IndexError("out of bounds") + b = self.a[self.pos] + self.pos += 1 + return b + + def get_char(self): + return chr(self.get_byte()) + + # Read base64-encoded unsigned integer. + def get_int(self): + v = 0 + while self.have_bytes(): + byte = self.get_byte() + c = z_value[byte & 0x7F] + if c < 0: + break + v = (v << 6) + c + self.pos -= 1 + return v & 0xFFFFFFFF # Ensure unsigned 32-bit integer + + +# Writer writes an array. +class Writer: + def __init__(self): + self.a = [] # Internal array to store data + + def to_byte_array(self, source_type): + if isinstance(source_type, list): + return self.a # Return as list + return bytes(self.a) # Return as bytes + + # Copy from array at start to end. + def put_array(self, a, start, end): + # Copy elements from array 'a' from 'start' to 'end' into self.a + self.a.extend(a[start:end]) + + +# Return a 32-bit checksum of the array. +def checksum(arr): + sum0 = sum1 = sum2 = sum3 = 0 + z = 0 + _n = len(arr) + # Unrolling the loop for performance + while _n >= 16: + sum0 = (sum0 + arr[z + 0]) & 0xFFFFFFFF + sum1 = (sum1 + arr[z + 1]) & 0xFFFFFFFF + sum2 = (sum2 + arr[z + 2]) & 0xFFFFFFFF + sum3 = (sum3 + arr[z + 3]) & 0xFFFFFFFF + + sum0 = (sum0 + arr[z + 4]) & 0xFFFFFFFF + sum1 = (sum1 + arr[z + 5]) & 0xFFFFFFFF + sum2 = (sum2 + arr[z + 6]) & 0xFFFFFFFF + sum3 = (sum3 + arr[z + 7]) & 0xFFFFFFFF + + sum0 = (sum0 + arr[z + 8]) & 0xFFFFFFFF + sum1 = (sum1 + arr[z + 9]) & 0xFFFFFFFF + sum2 = (sum2 + arr[z + 10]) & 0xFFFFFFFF + sum3 = (sum3 + arr[z + 11]) & 0xFFFFFFFF + + sum0 = (sum0 + arr[z + 12]) & 0xFFFFFFFF + sum1 = (sum1 + arr[z + 13]) & 0xFFFFFFFF + sum2 = (sum2 + arr[z + 14]) & 0xFFFFFFFF + sum3 = (sum3 + arr[z + 15]) & 0xFFFFFFFF + + z += 16 + _n -= 16 + + while _n >= 4: + sum0 = (sum0 + arr[z + 0]) & 0xFFFFFFFF + sum1 = (sum1 + arr[z + 1]) & 0xFFFFFFFF + sum2 = (sum2 + arr[z + 2]) & 0xFFFFFFFF + sum3 = (sum3 + arr[z + 3]) & 0xFFFFFFFF + z += 4 + _n -= 4 + + sum3 = (sum3 + (sum2 << 8) + (sum1 << 16) + (sum0 << 24)) & 0xFFFFFFFF + + # Handle remaining bytes + if _n == 3: + sum3 = (sum3 + (arr[z + 2] << 8)) & 0xFFFFFFFF + sum3 = (sum3 + (arr[z + 1] << 16)) & 0xFFFFFFFF + sum3 = (sum3 + (arr[z + 0] << 24)) & 0xFFFFFFFF + elif _n == 2: + sum3 = (sum3 + (arr[z + 1] << 16)) & 0xFFFFFFFF + sum3 = (sum3 + (arr[z + 0] << 24)) & 0xFFFFFFFF + elif _n == 1: + sum3 = (sum3 + (arr[z + 0] << 24)) & 0xFFFFFFFF + + return sum3 & 0xFFFFFFFF # Ensure unsigned 32-bit integer + + +# Apply a delta byte array to a source byte array, returning the target byte array. +def apply_delta(source, delta): # noqa: PLR0912 + total = 0 + z_delta = Reader(delta) + len_src = len(source) + len_delta = len(delta) + + limit = z_delta.get_int() + c = z_delta.get_char() + if c != "\n": + raise ValueError("size integer not terminated by '\\n'") + + z_out = Writer() + while z_delta.have_bytes(): + cnt = z_delta.get_int() + operator = z_delta.get_char() + + if operator == "@": + ofst = z_delta.get_int() + if z_delta.have_bytes() and z_delta.get_char() != ",": + raise ValueError("copy command not terminated by ','") + total += cnt + if total > limit: + raise ValueError("copy exceeds output file size") + if ofst + cnt > len_src: + raise ValueError("copy extends past end of input") + z_out.put_array(source, ofst, ofst + cnt) + + elif operator == ":": + total += cnt + if total > limit: + raise ValueError("insert command gives an output larger than predicted") + if cnt > len_delta - z_delta.pos: + raise ValueError("insert count exceeds size of delta") + z_out.put_array(z_delta.a, z_delta.pos, z_delta.pos + cnt) + z_delta.pos += cnt + + elif operator == ";": + out = z_out.to_byte_array(source) + if cnt != checksum(out): + raise ValueError("bad checksum") + if total != limit: + raise ValueError("generated size does not match predicted size") + return out + + else: + raise ValueError("unknown delta operator") + + raise ValueError("unterminated delta") diff --git a/centrifuge/protocol/client_pb2.py b/centrifuge/protocol/client_pb2.py index 204c14d..a6e80e2 100644 --- a/centrifuge/protocol/client_pb2.py +++ b/centrifuge/protocol/client_pb2.py @@ -13,7 +13,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x63lient.proto\x12\x1f\x63\x65ntrifugal.centrifuge.protocol\"9\n\x05\x45rror\x12\x0c\n\x04\x63ode\x18\x01 \x01(\r\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x11\n\ttemporary\x18\x03 \x01(\x08\"?\n\x10\x45mulationRequest\x12\x0c\n\x04node\x18\x01 \x01(\t\x12\x0f\n\x07session\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\xc7\x06\n\x07\x43ommand\x12\n\n\x02id\x18\x01 \x01(\r\x12@\n\x07\x63onnect\x18\x04 \x01(\x0b\x32/.centrifugal.centrifuge.protocol.ConnectRequest\x12\x44\n\tsubscribe\x18\x05 \x01(\x0b\x32\x31.centrifugal.centrifuge.protocol.SubscribeRequest\x12H\n\x0bunsubscribe\x18\x06 \x01(\x0b\x32\x33.centrifugal.centrifuge.protocol.UnsubscribeRequest\x12@\n\x07publish\x18\x07 \x01(\x0b\x32/.centrifugal.centrifuge.protocol.PublishRequest\x12\x42\n\x08presence\x18\x08 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.PresenceRequest\x12M\n\x0epresence_stats\x18\t \x01(\x0b\x32\x35.centrifugal.centrifuge.protocol.PresenceStatsRequest\x12@\n\x07history\x18\n \x01(\x0b\x32/.centrifugal.centrifuge.protocol.HistoryRequest\x12:\n\x04ping\x18\x0b \x01(\x0b\x32,.centrifugal.centrifuge.protocol.PingRequest\x12:\n\x04send\x18\x0c \x01(\x0b\x32,.centrifugal.centrifuge.protocol.SendRequest\x12\x38\n\x03rpc\x18\r \x01(\x0b\x32+.centrifugal.centrifuge.protocol.RPCRequest\x12@\n\x07refresh\x18\x0e \x01(\x0b\x32/.centrifugal.centrifuge.protocol.RefreshRequest\x12G\n\x0bsub_refresh\x18\x0f \x01(\x0b\x32\x32.centrifugal.centrifuge.protocol.SubRefreshRequestJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"\xe4\x06\n\x05Reply\x12\n\n\x02id\x18\x01 \x01(\r\x12\x35\n\x05\x65rror\x18\x02 \x01(\x0b\x32&.centrifugal.centrifuge.protocol.Error\x12\x33\n\x04push\x18\x04 \x01(\x0b\x32%.centrifugal.centrifuge.protocol.Push\x12?\n\x07\x63onnect\x18\x05 \x01(\x0b\x32..centrifugal.centrifuge.protocol.ConnectResult\x12\x43\n\tsubscribe\x18\x06 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.SubscribeResult\x12G\n\x0bunsubscribe\x18\x07 \x01(\x0b\x32\x32.centrifugal.centrifuge.protocol.UnsubscribeResult\x12?\n\x07publish\x18\x08 \x01(\x0b\x32..centrifugal.centrifuge.protocol.PublishResult\x12\x41\n\x08presence\x18\t \x01(\x0b\x32/.centrifugal.centrifuge.protocol.PresenceResult\x12L\n\x0epresence_stats\x18\n \x01(\x0b\x32\x34.centrifugal.centrifuge.protocol.PresenceStatsResult\x12?\n\x07history\x18\x0b \x01(\x0b\x32..centrifugal.centrifuge.protocol.HistoryResult\x12\x39\n\x04ping\x18\x0c \x01(\x0b\x32+.centrifugal.centrifuge.protocol.PingResult\x12\x37\n\x03rpc\x18\r \x01(\x0b\x32*.centrifugal.centrifuge.protocol.RPCResult\x12?\n\x07refresh\x18\x0e \x01(\x0b\x32..centrifugal.centrifuge.protocol.RefreshResult\x12\x46\n\x0bsub_refresh\x18\x0f \x01(\x0b\x32\x31.centrifugal.centrifuge.protocol.SubRefreshResultJ\x04\x08\x03\x10\x04\"\xbe\x04\n\x04Push\x12\x0f\n\x07\x63hannel\x18\x02 \x01(\t\x12\x39\n\x03pub\x18\x04 \x01(\x0b\x32,.centrifugal.centrifuge.protocol.Publication\x12\x33\n\x04join\x18\x05 \x01(\x0b\x32%.centrifugal.centrifuge.protocol.Join\x12\x35\n\x05leave\x18\x06 \x01(\x0b\x32&.centrifugal.centrifuge.protocol.Leave\x12\x41\n\x0bunsubscribe\x18\x07 \x01(\x0b\x32,.centrifugal.centrifuge.protocol.Unsubscribe\x12\x39\n\x07message\x18\x08 \x01(\x0b\x32(.centrifugal.centrifuge.protocol.Message\x12=\n\tsubscribe\x18\t \x01(\x0b\x32*.centrifugal.centrifuge.protocol.Subscribe\x12\x39\n\x07\x63onnect\x18\n \x01(\x0b\x32(.centrifugal.centrifuge.protocol.Connect\x12?\n\ndisconnect\x18\x0b \x01(\x0b\x32+.centrifugal.centrifuge.protocol.Disconnect\x12\x39\n\x07refresh\x18\x0c \x01(\x0b\x32(.centrifugal.centrifuge.protocol.RefreshJ\x04\x08\x01\x10\x02J\x04\x08\x03\x10\x04\"P\n\nClientInfo\x12\x0c\n\x04user\x18\x01 \x01(\t\x12\x0e\n\x06\x63lient\x18\x02 \x01(\t\x12\x11\n\tconn_info\x18\x03 \x01(\x0c\x12\x11\n\tchan_info\x18\x04 \x01(\x0c\"\xeb\x01\n\x0bPublication\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\x12\x39\n\x04info\x18\x05 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo\x12\x0e\n\x06offset\x18\x06 \x01(\x04\x12\x44\n\x04tags\x18\x07 \x03(\x0b\x32\x36.centrifugal.centrifuge.protocol.Publication.TagsEntry\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01J\x04\x08\x01\x10\x02J\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"A\n\x04Join\x12\x39\n\x04info\x18\x01 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo\"B\n\x05Leave\x12\x39\n\x04info\x18\x01 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo\"1\n\x0bUnsubscribe\x12\x0c\n\x04\x63ode\x18\x02 \x01(\r\x12\x0e\n\x06reason\x18\x03 \x01(\tJ\x04\x08\x01\x10\x02\"m\n\tSubscribe\x12\x13\n\x0brecoverable\x18\x01 \x01(\x08\x12\r\n\x05\x65poch\x18\x04 \x01(\t\x12\x0e\n\x06offset\x18\x05 \x01(\x04\x12\x12\n\npositioned\x18\x06 \x01(\x08\x12\x0c\n\x04\x64\x61ta\x18\x07 \x01(\x0cJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"\x17\n\x07Message\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"\xb2\x02\n\x07\x43onnect\x12\x0e\n\x06\x63lient\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12@\n\x04subs\x18\x04 \x03(\x0b\x32\x32.centrifugal.centrifuge.protocol.Connect.SubsEntry\x12\x0f\n\x07\x65xpires\x18\x05 \x01(\x08\x12\x0b\n\x03ttl\x18\x06 \x01(\r\x12\x0c\n\x04ping\x18\x07 \x01(\r\x12\x0c\n\x04pong\x18\x08 \x01(\x08\x12\x0f\n\x07session\x18\t \x01(\t\x12\x0c\n\x04node\x18\n \x01(\t\x1a]\n\tSubsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12?\n\x05value\x18\x02 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.SubscribeResult:\x02\x38\x01\"=\n\nDisconnect\x12\x0c\n\x04\x63ode\x18\x01 \x01(\r\x12\x0e\n\x06reason\x18\x02 \x01(\t\x12\x11\n\treconnect\x18\x03 \x01(\x08\"\'\n\x07Refresh\x12\x0f\n\x07\x65xpires\x18\x01 \x01(\x08\x12\x0b\n\x03ttl\x18\x02 \x01(\r\"\xf5\x01\n\x0e\x43onnectRequest\x12\r\n\x05token\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12G\n\x04subs\x18\x03 \x03(\x0b\x32\x39.centrifugal.centrifuge.protocol.ConnectRequest.SubsEntry\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0f\n\x07version\x18\x05 \x01(\t\x1a^\n\tSubsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12@\n\x05value\x18\x02 \x01(\x0b\x32\x31.centrifugal.centrifuge.protocol.SubscribeRequest:\x02\x38\x01\"\xbe\x02\n\rConnectResult\x12\x0e\n\x06\x63lient\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0f\n\x07\x65xpires\x18\x03 \x01(\x08\x12\x0b\n\x03ttl\x18\x04 \x01(\r\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x12\x46\n\x04subs\x18\x06 \x03(\x0b\x32\x38.centrifugal.centrifuge.protocol.ConnectResult.SubsEntry\x12\x0c\n\x04ping\x18\x07 \x01(\r\x12\x0c\n\x04pong\x18\x08 \x01(\x08\x12\x0f\n\x07session\x18\t \x01(\t\x12\x0c\n\x04node\x18\n \x01(\t\x1a]\n\tSubsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12?\n\x05value\x18\x02 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.SubscribeResult:\x02\x38\x01\"\x1f\n\x0eRefreshRequest\x12\r\n\x05token\x18\x01 \x01(\t\"N\n\rRefreshResult\x12\x0e\n\x06\x63lient\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0f\n\x07\x65xpires\x18\x03 \x01(\x08\x12\x0b\n\x03ttl\x18\x04 \x01(\r\"\xb9\x01\n\x10SubscribeRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\r\n\x05token\x18\x02 \x01(\t\x12\x0f\n\x07recover\x18\x03 \x01(\x08\x12\r\n\x05\x65poch\x18\x06 \x01(\t\x12\x0e\n\x06offset\x18\x07 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x08 \x01(\x0c\x12\x12\n\npositioned\x18\t \x01(\x08\x12\x13\n\x0brecoverable\x18\n \x01(\x08\x12\x12\n\njoin_leave\x18\x0b \x01(\x08J\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06\"\x80\x02\n\x0fSubscribeResult\x12\x0f\n\x07\x65xpires\x18\x01 \x01(\x08\x12\x0b\n\x03ttl\x18\x02 \x01(\r\x12\x13\n\x0brecoverable\x18\x03 \x01(\x08\x12\r\n\x05\x65poch\x18\x06 \x01(\t\x12\x42\n\x0cpublications\x18\x07 \x03(\x0b\x32,.centrifugal.centrifuge.protocol.Publication\x12\x11\n\trecovered\x18\x08 \x01(\x08\x12\x0e\n\x06offset\x18\t \x01(\x04\x12\x12\n\npositioned\x18\n \x01(\x08\x12\x0c\n\x04\x64\x61ta\x18\x0b \x01(\x0c\x12\x16\n\x0ewas_recovering\x18\x0c \x01(\x08J\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06\"3\n\x11SubRefreshRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\r\n\x05token\x18\x02 \x01(\t\"0\n\x10SubRefreshResult\x12\x0f\n\x07\x65xpires\x18\x01 \x01(\x08\x12\x0b\n\x03ttl\x18\x02 \x01(\r\"%\n\x12UnsubscribeRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\"\x13\n\x11UnsubscribeResult\"/\n\x0ePublishRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x0f\n\rPublishResult\"\"\n\x0fPresenceRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\"\xbf\x01\n\x0ePresenceResult\x12O\n\x08presence\x18\x01 \x03(\x0b\x32=.centrifugal.centrifuge.protocol.PresenceResult.PresenceEntry\x1a\\\n\rPresenceEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12:\n\x05value\x18\x02 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo:\x02\x38\x01\"\'\n\x14PresenceStatsRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\"=\n\x13PresenceStatsResult\x12\x13\n\x0bnum_clients\x18\x01 \x01(\r\x12\x11\n\tnum_users\x18\x02 \x01(\r\"/\n\x0eStreamPosition\x12\x0e\n\x06offset\x18\x01 \x01(\x04\x12\r\n\x05\x65poch\x18\x02 \x01(\t\"\x9f\x01\n\x0eHistoryRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\r\n\x05limit\x18\x07 \x01(\x05\x12>\n\x05since\x18\x08 \x01(\x0b\x32/.centrifugal.centrifuge.protocol.StreamPosition\x12\x0f\n\x07reverse\x18\t \x01(\x08J\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04J\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06J\x04\x08\x06\x10\x07\"r\n\rHistoryResult\x12\x42\n\x0cpublications\x18\x01 \x03(\x0b\x32,.centrifugal.centrifuge.protocol.Publication\x12\r\n\x05\x65poch\x18\x02 \x01(\t\x12\x0e\n\x06offset\x18\x03 \x01(\x04\"\r\n\x0bPingRequest\"\x0c\n\nPingResult\"*\n\nRPCRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x0e\n\x06method\x18\x02 \x01(\t\"\x19\n\tRPCResult\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"\x1b\n\x0bSendRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x42\rZ\x0b./;protocolb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x63lient.proto\x12\x1f\x63\x65ntrifugal.centrifuge.protocol\"9\n\x05\x45rror\x12\x0c\n\x04\x63ode\x18\x01 \x01(\r\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x11\n\ttemporary\x18\x03 \x01(\x08\"?\n\x10\x45mulationRequest\x12\x0c\n\x04node\x18\x01 \x01(\t\x12\x0f\n\x07session\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\xc7\x06\n\x07\x43ommand\x12\n\n\x02id\x18\x01 \x01(\r\x12@\n\x07\x63onnect\x18\x04 \x01(\x0b\x32/.centrifugal.centrifuge.protocol.ConnectRequest\x12\x44\n\tsubscribe\x18\x05 \x01(\x0b\x32\x31.centrifugal.centrifuge.protocol.SubscribeRequest\x12H\n\x0bunsubscribe\x18\x06 \x01(\x0b\x32\x33.centrifugal.centrifuge.protocol.UnsubscribeRequest\x12@\n\x07publish\x18\x07 \x01(\x0b\x32/.centrifugal.centrifuge.protocol.PublishRequest\x12\x42\n\x08presence\x18\x08 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.PresenceRequest\x12M\n\x0epresence_stats\x18\t \x01(\x0b\x32\x35.centrifugal.centrifuge.protocol.PresenceStatsRequest\x12@\n\x07history\x18\n \x01(\x0b\x32/.centrifugal.centrifuge.protocol.HistoryRequest\x12:\n\x04ping\x18\x0b \x01(\x0b\x32,.centrifugal.centrifuge.protocol.PingRequest\x12:\n\x04send\x18\x0c \x01(\x0b\x32,.centrifugal.centrifuge.protocol.SendRequest\x12\x38\n\x03rpc\x18\r \x01(\x0b\x32+.centrifugal.centrifuge.protocol.RPCRequest\x12@\n\x07refresh\x18\x0e \x01(\x0b\x32/.centrifugal.centrifuge.protocol.RefreshRequest\x12G\n\x0bsub_refresh\x18\x0f \x01(\x0b\x32\x32.centrifugal.centrifuge.protocol.SubRefreshRequestJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"\xe4\x06\n\x05Reply\x12\n\n\x02id\x18\x01 \x01(\r\x12\x35\n\x05\x65rror\x18\x02 \x01(\x0b\x32&.centrifugal.centrifuge.protocol.Error\x12\x33\n\x04push\x18\x04 \x01(\x0b\x32%.centrifugal.centrifuge.protocol.Push\x12?\n\x07\x63onnect\x18\x05 \x01(\x0b\x32..centrifugal.centrifuge.protocol.ConnectResult\x12\x43\n\tsubscribe\x18\x06 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.SubscribeResult\x12G\n\x0bunsubscribe\x18\x07 \x01(\x0b\x32\x32.centrifugal.centrifuge.protocol.UnsubscribeResult\x12?\n\x07publish\x18\x08 \x01(\x0b\x32..centrifugal.centrifuge.protocol.PublishResult\x12\x41\n\x08presence\x18\t \x01(\x0b\x32/.centrifugal.centrifuge.protocol.PresenceResult\x12L\n\x0epresence_stats\x18\n \x01(\x0b\x32\x34.centrifugal.centrifuge.protocol.PresenceStatsResult\x12?\n\x07history\x18\x0b \x01(\x0b\x32..centrifugal.centrifuge.protocol.HistoryResult\x12\x39\n\x04ping\x18\x0c \x01(\x0b\x32+.centrifugal.centrifuge.protocol.PingResult\x12\x37\n\x03rpc\x18\r \x01(\x0b\x32*.centrifugal.centrifuge.protocol.RPCResult\x12?\n\x07refresh\x18\x0e \x01(\x0b\x32..centrifugal.centrifuge.protocol.RefreshResult\x12\x46\n\x0bsub_refresh\x18\x0f \x01(\x0b\x32\x31.centrifugal.centrifuge.protocol.SubRefreshResultJ\x04\x08\x03\x10\x04\"\xbe\x04\n\x04Push\x12\x0f\n\x07\x63hannel\x18\x02 \x01(\t\x12\x39\n\x03pub\x18\x04 \x01(\x0b\x32,.centrifugal.centrifuge.protocol.Publication\x12\x33\n\x04join\x18\x05 \x01(\x0b\x32%.centrifugal.centrifuge.protocol.Join\x12\x35\n\x05leave\x18\x06 \x01(\x0b\x32&.centrifugal.centrifuge.protocol.Leave\x12\x41\n\x0bunsubscribe\x18\x07 \x01(\x0b\x32,.centrifugal.centrifuge.protocol.Unsubscribe\x12\x39\n\x07message\x18\x08 \x01(\x0b\x32(.centrifugal.centrifuge.protocol.Message\x12=\n\tsubscribe\x18\t \x01(\x0b\x32*.centrifugal.centrifuge.protocol.Subscribe\x12\x39\n\x07\x63onnect\x18\n \x01(\x0b\x32(.centrifugal.centrifuge.protocol.Connect\x12?\n\ndisconnect\x18\x0b \x01(\x0b\x32+.centrifugal.centrifuge.protocol.Disconnect\x12\x39\n\x07refresh\x18\x0c \x01(\x0b\x32(.centrifugal.centrifuge.protocol.RefreshJ\x04\x08\x01\x10\x02J\x04\x08\x03\x10\x04\"P\n\nClientInfo\x12\x0c\n\x04user\x18\x01 \x01(\t\x12\x0e\n\x06\x63lient\x18\x02 \x01(\t\x12\x11\n\tconn_info\x18\x03 \x01(\x0c\x12\x11\n\tchan_info\x18\x04 \x01(\x0c\"\xfa\x01\n\x0bPublication\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\x12\x39\n\x04info\x18\x05 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo\x12\x0e\n\x06offset\x18\x06 \x01(\x04\x12\x44\n\x04tags\x18\x07 \x03(\x0b\x32\x36.centrifugal.centrifuge.protocol.Publication.TagsEntry\x12\r\n\x05\x64\x65lta\x18\x08 \x01(\x08\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01J\x04\x08\x01\x10\x02J\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"A\n\x04Join\x12\x39\n\x04info\x18\x01 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo\"B\n\x05Leave\x12\x39\n\x04info\x18\x01 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo\"1\n\x0bUnsubscribe\x12\x0c\n\x04\x63ode\x18\x02 \x01(\r\x12\x0e\n\x06reason\x18\x03 \x01(\tJ\x04\x08\x01\x10\x02\"m\n\tSubscribe\x12\x13\n\x0brecoverable\x18\x01 \x01(\x08\x12\r\n\x05\x65poch\x18\x04 \x01(\t\x12\x0e\n\x06offset\x18\x05 \x01(\x04\x12\x12\n\npositioned\x18\x06 \x01(\x08\x12\x0c\n\x04\x64\x61ta\x18\x07 \x01(\x0cJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"\x17\n\x07Message\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"\xb2\x02\n\x07\x43onnect\x12\x0e\n\x06\x63lient\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\x12@\n\x04subs\x18\x04 \x03(\x0b\x32\x32.centrifugal.centrifuge.protocol.Connect.SubsEntry\x12\x0f\n\x07\x65xpires\x18\x05 \x01(\x08\x12\x0b\n\x03ttl\x18\x06 \x01(\r\x12\x0c\n\x04ping\x18\x07 \x01(\r\x12\x0c\n\x04pong\x18\x08 \x01(\x08\x12\x0f\n\x07session\x18\t \x01(\t\x12\x0c\n\x04node\x18\n \x01(\t\x1a]\n\tSubsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12?\n\x05value\x18\x02 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.SubscribeResult:\x02\x38\x01\"=\n\nDisconnect\x12\x0c\n\x04\x63ode\x18\x01 \x01(\r\x12\x0e\n\x06reason\x18\x02 \x01(\t\x12\x11\n\treconnect\x18\x03 \x01(\x08\"\'\n\x07Refresh\x12\x0f\n\x07\x65xpires\x18\x01 \x01(\x08\x12\x0b\n\x03ttl\x18\x02 \x01(\r\"\xf5\x01\n\x0e\x43onnectRequest\x12\r\n\x05token\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12G\n\x04subs\x18\x03 \x03(\x0b\x32\x39.centrifugal.centrifuge.protocol.ConnectRequest.SubsEntry\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0f\n\x07version\x18\x05 \x01(\t\x1a^\n\tSubsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12@\n\x05value\x18\x02 \x01(\x0b\x32\x31.centrifugal.centrifuge.protocol.SubscribeRequest:\x02\x38\x01\"\xbe\x02\n\rConnectResult\x12\x0e\n\x06\x63lient\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0f\n\x07\x65xpires\x18\x03 \x01(\x08\x12\x0b\n\x03ttl\x18\x04 \x01(\r\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x12\x46\n\x04subs\x18\x06 \x03(\x0b\x32\x38.centrifugal.centrifuge.protocol.ConnectResult.SubsEntry\x12\x0c\n\x04ping\x18\x07 \x01(\r\x12\x0c\n\x04pong\x18\x08 \x01(\x08\x12\x0f\n\x07session\x18\t \x01(\t\x12\x0c\n\x04node\x18\n \x01(\t\x1a]\n\tSubsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12?\n\x05value\x18\x02 \x01(\x0b\x32\x30.centrifugal.centrifuge.protocol.SubscribeResult:\x02\x38\x01\"\x1f\n\x0eRefreshRequest\x12\r\n\x05token\x18\x01 \x01(\t\"N\n\rRefreshResult\x12\x0e\n\x06\x63lient\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x0f\n\x07\x65xpires\x18\x03 \x01(\x08\x12\x0b\n\x03ttl\x18\x04 \x01(\r\"\xc8\x01\n\x10SubscribeRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\r\n\x05token\x18\x02 \x01(\t\x12\x0f\n\x07recover\x18\x03 \x01(\x08\x12\r\n\x05\x65poch\x18\x06 \x01(\t\x12\x0e\n\x06offset\x18\x07 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x08 \x01(\x0c\x12\x12\n\npositioned\x18\t \x01(\x08\x12\x13\n\x0brecoverable\x18\n \x01(\x08\x12\x12\n\njoin_leave\x18\x0b \x01(\x08\x12\r\n\x05\x64\x65lta\x18\x0c \x01(\tJ\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06\"\x8f\x02\n\x0fSubscribeResult\x12\x0f\n\x07\x65xpires\x18\x01 \x01(\x08\x12\x0b\n\x03ttl\x18\x02 \x01(\r\x12\x13\n\x0brecoverable\x18\x03 \x01(\x08\x12\r\n\x05\x65poch\x18\x06 \x01(\t\x12\x42\n\x0cpublications\x18\x07 \x03(\x0b\x32,.centrifugal.centrifuge.protocol.Publication\x12\x11\n\trecovered\x18\x08 \x01(\x08\x12\x0e\n\x06offset\x18\t \x01(\x04\x12\x12\n\npositioned\x18\n \x01(\x08\x12\x0c\n\x04\x64\x61ta\x18\x0b \x01(\x0c\x12\x16\n\x0ewas_recovering\x18\x0c \x01(\x08\x12\r\n\x05\x64\x65lta\x18\r \x01(\x08J\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06\"3\n\x11SubRefreshRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\r\n\x05token\x18\x02 \x01(\t\"0\n\x10SubRefreshResult\x12\x0f\n\x07\x65xpires\x18\x01 \x01(\x08\x12\x0b\n\x03ttl\x18\x02 \x01(\r\"%\n\x12UnsubscribeRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\"\x13\n\x11UnsubscribeResult\"/\n\x0ePublishRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x0f\n\rPublishResult\"\"\n\x0fPresenceRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\"\xbf\x01\n\x0ePresenceResult\x12O\n\x08presence\x18\x01 \x03(\x0b\x32=.centrifugal.centrifuge.protocol.PresenceResult.PresenceEntry\x1a\\\n\rPresenceEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12:\n\x05value\x18\x02 \x01(\x0b\x32+.centrifugal.centrifuge.protocol.ClientInfo:\x02\x38\x01\"\'\n\x14PresenceStatsRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\"=\n\x13PresenceStatsResult\x12\x13\n\x0bnum_clients\x18\x01 \x01(\r\x12\x11\n\tnum_users\x18\x02 \x01(\r\"/\n\x0eStreamPosition\x12\x0e\n\x06offset\x18\x01 \x01(\x04\x12\r\n\x05\x65poch\x18\x02 \x01(\t\"\x9f\x01\n\x0eHistoryRequest\x12\x0f\n\x07\x63hannel\x18\x01 \x01(\t\x12\r\n\x05limit\x18\x07 \x01(\x05\x12>\n\x05since\x18\x08 \x01(\x0b\x32/.centrifugal.centrifuge.protocol.StreamPosition\x12\x0f\n\x07reverse\x18\t \x01(\x08J\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04J\x04\x08\x04\x10\x05J\x04\x08\x05\x10\x06J\x04\x08\x06\x10\x07\"r\n\rHistoryResult\x12\x42\n\x0cpublications\x18\x01 \x03(\x0b\x32,.centrifugal.centrifuge.protocol.Publication\x12\r\n\x05\x65poch\x18\x02 \x01(\t\x12\x0e\n\x06offset\x18\x03 \x01(\x04\"\r\n\x0bPingRequest\"\x0c\n\nPingResult\"*\n\nRPCRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\x0e\n\x06method\x18\x02 \x01(\t\"\x19\n\tRPCResult\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\"\x1b\n\x0bSendRequest\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x42\rZ\x0b./;protocolb\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'client_pb2', globals()) @@ -44,79 +44,79 @@ _CLIENTINFO._serialized_start=2463 _CLIENTINFO._serialized_end=2543 _PUBLICATION._serialized_start=2546 - _PUBLICATION._serialized_end=2781 - _PUBLICATION_TAGSENTRY._serialized_start=2720 - _PUBLICATION_TAGSENTRY._serialized_end=2763 - _JOIN._serialized_start=2783 - _JOIN._serialized_end=2848 - _LEAVE._serialized_start=2850 - _LEAVE._serialized_end=2916 - _UNSUBSCRIBE._serialized_start=2918 - _UNSUBSCRIBE._serialized_end=2967 - _SUBSCRIBE._serialized_start=2969 - _SUBSCRIBE._serialized_end=3078 - _MESSAGE._serialized_start=3080 - _MESSAGE._serialized_end=3103 - _CONNECT._serialized_start=3106 - _CONNECT._serialized_end=3412 - _CONNECT_SUBSENTRY._serialized_start=3319 - _CONNECT_SUBSENTRY._serialized_end=3412 - _DISCONNECT._serialized_start=3414 - _DISCONNECT._serialized_end=3475 - _REFRESH._serialized_start=3477 - _REFRESH._serialized_end=3516 - _CONNECTREQUEST._serialized_start=3519 - _CONNECTREQUEST._serialized_end=3764 - _CONNECTREQUEST_SUBSENTRY._serialized_start=3670 - _CONNECTREQUEST_SUBSENTRY._serialized_end=3764 - _CONNECTRESULT._serialized_start=3767 - _CONNECTRESULT._serialized_end=4085 - _CONNECTRESULT_SUBSENTRY._serialized_start=3319 - _CONNECTRESULT_SUBSENTRY._serialized_end=3412 - _REFRESHREQUEST._serialized_start=4087 - _REFRESHREQUEST._serialized_end=4118 - _REFRESHRESULT._serialized_start=4120 - _REFRESHRESULT._serialized_end=4198 - _SUBSCRIBEREQUEST._serialized_start=4201 - _SUBSCRIBEREQUEST._serialized_end=4386 - _SUBSCRIBERESULT._serialized_start=4389 - _SUBSCRIBERESULT._serialized_end=4645 - _SUBREFRESHREQUEST._serialized_start=4647 - _SUBREFRESHREQUEST._serialized_end=4698 - _SUBREFRESHRESULT._serialized_start=4700 - _SUBREFRESHRESULT._serialized_end=4748 - _UNSUBSCRIBEREQUEST._serialized_start=4750 - _UNSUBSCRIBEREQUEST._serialized_end=4787 - _UNSUBSCRIBERESULT._serialized_start=4789 - _UNSUBSCRIBERESULT._serialized_end=4808 - _PUBLISHREQUEST._serialized_start=4810 - _PUBLISHREQUEST._serialized_end=4857 - _PUBLISHRESULT._serialized_start=4859 - _PUBLISHRESULT._serialized_end=4874 - _PRESENCEREQUEST._serialized_start=4876 - _PRESENCEREQUEST._serialized_end=4910 - _PRESENCERESULT._serialized_start=4913 - _PRESENCERESULT._serialized_end=5104 - _PRESENCERESULT_PRESENCEENTRY._serialized_start=5012 - _PRESENCERESULT_PRESENCEENTRY._serialized_end=5104 - _PRESENCESTATSREQUEST._serialized_start=5106 - _PRESENCESTATSREQUEST._serialized_end=5145 - _PRESENCESTATSRESULT._serialized_start=5147 - _PRESENCESTATSRESULT._serialized_end=5208 - _STREAMPOSITION._serialized_start=5210 - _STREAMPOSITION._serialized_end=5257 - _HISTORYREQUEST._serialized_start=5260 - _HISTORYREQUEST._serialized_end=5419 - _HISTORYRESULT._serialized_start=5421 - _HISTORYRESULT._serialized_end=5535 - _PINGREQUEST._serialized_start=5537 - _PINGREQUEST._serialized_end=5550 - _PINGRESULT._serialized_start=5552 - _PINGRESULT._serialized_end=5564 - _RPCREQUEST._serialized_start=5566 - _RPCREQUEST._serialized_end=5608 - _RPCRESULT._serialized_start=5610 - _RPCRESULT._serialized_end=5635 - _SENDREQUEST._serialized_start=5637 - _SENDREQUEST._serialized_end=5664 + _PUBLICATION._serialized_end=2796 + _PUBLICATION_TAGSENTRY._serialized_start=2735 + _PUBLICATION_TAGSENTRY._serialized_end=2778 + _JOIN._serialized_start=2798 + _JOIN._serialized_end=2863 + _LEAVE._serialized_start=2865 + _LEAVE._serialized_end=2931 + _UNSUBSCRIBE._serialized_start=2933 + _UNSUBSCRIBE._serialized_end=2982 + _SUBSCRIBE._serialized_start=2984 + _SUBSCRIBE._serialized_end=3093 + _MESSAGE._serialized_start=3095 + _MESSAGE._serialized_end=3118 + _CONNECT._serialized_start=3121 + _CONNECT._serialized_end=3427 + _CONNECT_SUBSENTRY._serialized_start=3334 + _CONNECT_SUBSENTRY._serialized_end=3427 + _DISCONNECT._serialized_start=3429 + _DISCONNECT._serialized_end=3490 + _REFRESH._serialized_start=3492 + _REFRESH._serialized_end=3531 + _CONNECTREQUEST._serialized_start=3534 + _CONNECTREQUEST._serialized_end=3779 + _CONNECTREQUEST_SUBSENTRY._serialized_start=3685 + _CONNECTREQUEST_SUBSENTRY._serialized_end=3779 + _CONNECTRESULT._serialized_start=3782 + _CONNECTRESULT._serialized_end=4100 + _CONNECTRESULT_SUBSENTRY._serialized_start=3334 + _CONNECTRESULT_SUBSENTRY._serialized_end=3427 + _REFRESHREQUEST._serialized_start=4102 + _REFRESHREQUEST._serialized_end=4133 + _REFRESHRESULT._serialized_start=4135 + _REFRESHRESULT._serialized_end=4213 + _SUBSCRIBEREQUEST._serialized_start=4216 + _SUBSCRIBEREQUEST._serialized_end=4416 + _SUBSCRIBERESULT._serialized_start=4419 + _SUBSCRIBERESULT._serialized_end=4690 + _SUBREFRESHREQUEST._serialized_start=4692 + _SUBREFRESHREQUEST._serialized_end=4743 + _SUBREFRESHRESULT._serialized_start=4745 + _SUBREFRESHRESULT._serialized_end=4793 + _UNSUBSCRIBEREQUEST._serialized_start=4795 + _UNSUBSCRIBEREQUEST._serialized_end=4832 + _UNSUBSCRIBERESULT._serialized_start=4834 + _UNSUBSCRIBERESULT._serialized_end=4853 + _PUBLISHREQUEST._serialized_start=4855 + _PUBLISHREQUEST._serialized_end=4902 + _PUBLISHRESULT._serialized_start=4904 + _PUBLISHRESULT._serialized_end=4919 + _PRESENCEREQUEST._serialized_start=4921 + _PRESENCEREQUEST._serialized_end=4955 + _PRESENCERESULT._serialized_start=4958 + _PRESENCERESULT._serialized_end=5149 + _PRESENCERESULT_PRESENCEENTRY._serialized_start=5057 + _PRESENCERESULT_PRESENCEENTRY._serialized_end=5149 + _PRESENCESTATSREQUEST._serialized_start=5151 + _PRESENCESTATSREQUEST._serialized_end=5190 + _PRESENCESTATSRESULT._serialized_start=5192 + _PRESENCESTATSRESULT._serialized_end=5253 + _STREAMPOSITION._serialized_start=5255 + _STREAMPOSITION._serialized_end=5302 + _HISTORYREQUEST._serialized_start=5305 + _HISTORYREQUEST._serialized_end=5464 + _HISTORYRESULT._serialized_start=5466 + _HISTORYRESULT._serialized_end=5580 + _PINGREQUEST._serialized_start=5582 + _PINGREQUEST._serialized_end=5595 + _PINGRESULT._serialized_start=5597 + _PINGRESULT._serialized_end=5609 + _RPCREQUEST._serialized_start=5611 + _RPCREQUEST._serialized_end=5653 + _RPCRESULT._serialized_start=5655 + _RPCRESULT._serialized_end=5680 + _SENDREQUEST._serialized_start=5682 + _SENDREQUEST._serialized_end=5709 # @@protoc_insertion_point(module_scope) diff --git a/centrifuge/types.py b/centrifuge/types.py index 6c0ad7b..3014947 100644 --- a/centrifuge/types.py +++ b/centrifuge/types.py @@ -37,12 +37,14 @@ class Publication: offset: publication offset in channel stream. data: published data. info: optional client information (i.e. may be None). + delta: whether this publication is a delta message or not """ offset: int data: Any info: Optional[ClientInfo] tags: Dict[str, str] + delta: bool @dataclass diff --git a/client.proto b/client.proto index 48edefa..e1e080c 100644 --- a/client.proto +++ b/client.proto @@ -102,6 +102,7 @@ message Publication { ClientInfo info = 5; uint64 offset = 6; map tags = 7; + bool delta = 8; } message Join { @@ -198,6 +199,7 @@ message SubscribeRequest { bool positioned = 9; bool recoverable = 10; bool join_leave = 11; + string delta = 12; } message SubscribeResult { @@ -212,6 +214,7 @@ message SubscribeResult { bool positioned = 10; bytes data = 11; bool was_recovering = 12; + bool delta = 13; } message SubRefreshRequest { diff --git a/example.py b/example.py index 7acebbe..aa00301 100644 --- a/example.py +++ b/example.py @@ -135,6 +135,7 @@ def run_example(): "example:channel", events=SubscriptionEventLoggerHandler(), get_token=get_subscription_token, + # you can pass `delta="fossil"` too ) async def run(): diff --git a/tests/test_fossil.py b/tests/test_fossil.py new file mode 100644 index 0000000..d549cc6 --- /dev/null +++ b/tests/test_fossil.py @@ -0,0 +1,180 @@ +import unittest +from centrifuge.fossil import apply_delta + + +class TestFossil(unittest.IsolatedAsyncioTestCase): + def test_apply_delta(self): + # Test 1 + source1_str = ( + '{"asks":[["609590","3792.6"],["609600","11507.8"],["609640","663.11"],' + '["609690","302.71"],["609700","744.52"],["609730","209.94"],' + '["609750","18.59"],["609790","156"],["609800","859.03"],["609830",' + '"216.98"],["609860","217.42"],["609870","60"],["609880","384.06"],' + '["609890","4615.87"],["609900","25.98"],["609940","63.95"],["609950",' + '"242.6"],["609960","2000"],["609970","1573"],["609980","47.56"],["609990",' + '"582.26"],["610000","42899.13"],["610020","24.46"],["610110","150"]],' + '"bids":[["609520","2010.12"],["609510","5080.7"],["609500","297.5"],' + '["609490","1238.52"],["609480","896.37"],["609470","1234.91"],["609460",' + '"451.36"],["609250","58.45"],["609220","786.48"],["609200","101.64"],' + '["609190","41.03"],["609160","650.49"],["609100","6932.07"],["609070",' + '"16.59"],["609050","149.22"],["609040","52.53"],["609030","11.52"],' + '["609020","1038.35"],["609010","334.83"],["609000","3453.95"],["608900",' + '"850.81"],["608880","57"],["608850","5.47"],["608840","41.23"]],' + '"lastTradePrice":"609520","lastUpdate":1727632299611}' + ) + delta1_str = "Fm\n7i@0,6:1852.77s@7o,7:303488}3~2wv0;" + out1 = ( + '{"asks":[["609590","3792.6"],["609600","11507.8"],["609640","663.11"],["609690",' + '"302.71"],["609700","744.52"],["609730","209.94"],["609750","18.59"],["609790",' + '"156"],["609800","859.03"],["609830","216.98"],["609860","217.42"],["609870",' + '"60"],["609880","384.06"],["609890","4615.87"],["609900","25.98"],["609940",' + '"63.95"],["609950","242.6"],["609960","2000"],["609970","1573"],["609980",' + '"47.56"],["609990","582.26"],["610000","42899.13"],["610020","24.46"],' + '["610110","150"]],"bids":[["609520","1852.72"],["609510","5080.7"],["609500",' + '"297.5"],["609490","1238.52"],["609480","896.37"],["609470","1234.91"],' + '["609460","451.36"],["609250","58.45"],["609220","786.48"],["609200","101.64"],' + '["609190","41.03"],["609160","650.49"],["609100","6932.07"],["609070","16.59"],' + '["609050","149.22"],["609040","52.53"],["609030","11.52"],["609020","1038.35"],' + '["609010","334.83"],["609000","3453.95"],["608900","850.81"],["608880","57"],' + '["608850","5.47"],["608840","41.23"]],"lastTradePrice":"609520",' + '"lastUpdate":1727632303488}' + ) + result_bytes = apply_delta(source1_str.encode("utf-8"), delta1_str.encode("utf-8")) + self.assertEqual(result_bytes.decode("utf-8"), out1) + + # Test 2 + source2_str = ( + '{"asks":[["610480","26.96"],["610490","32.76"],["610500","622.44"],' + '["610530","238.7"],["610540","990.9"],["610830","33"],["610840",' + '"159.9"],["610880","100"],["610890","33"],["610900","913.87"],["610920",' + '"30.52"],["610970","480.74"],["610980","1500"],["610990","266.61"],' + '["611000","9672.99"],["611100","404.63"],["611150","56.25"],["611170",' + '"2011.71"],["611200","25.2"],["611210","10.17"],["611240","150"],' + '["611320","8.27"],["611350","76.6"],["611400","777.8"]],"bids":[[' + '"610360","115.64"],["610350","525.38"],["610340","575.77"],["610330",' + '"421.83"],["610320","1943.17"],["610310","241.36"],["610300","3186.21"],' + '["610080","418.23"],["610050","167.12"],["610030","30"],["610010",' + '"31.14"],["610000","2989.86"],["609920","85.04"],["609910","58.72"],' + '["609900","2.05"],["609730","50"],["609700","729.81"],["609690",' + '"3608.06"],["609590","3.48"],["609580","17.48"],["609520","163.92"],' + '["609510","500"],["609500","3802.96"],["609460","86.27"]],' + '"lastTradePrice":"610490","lastUpdate":1727679775574}' + ) + delta2_str = "FS\nEx@0,2:36P@Ez,5:6413}icT15;" + out2 = ( + '{"asks":[["610480","26.96"],["610490","32.76"],["610500","622.44"],["610530",' + '"238.7"],["610540","990.9"],["610830","33"],["610840","159.9"],["610880",' + '"100"],["610890","33"],["610900","913.87"],["610920","30.52"],["610970",' + '"480.74"],["610980","1500"],["610990","266.61"],["611000","9672.99"],["611100",' + '"404.63"],["611150","56.25"],["611170","2011.71"],["611200","25.2"],["611210",' + '"10.17"],["611240","150"],["611320","8.27"],["611350","76.6"],["611400",' + '"777.8"]],"bids":[["610360","115.64"],["610350","525.38"],["610340","575.77"],' + '["610330","421.83"],["610320","1943.17"],["610310","241.36"],["610300",' + '"3186.21"],["610080","418.23"],["610050","167.12"],["610030","30"],["610010",' + '"31.14"],["610000","2989.86"],["609920","85.04"],["609910","58.72"],["609900",' + '"2.05"],["609730","50"],["609700","729.81"],["609690","3608.06"],["609590",' + '"3.48"],["609580","17.48"],["609520","163.92"],["609510","500"],["609500",' + '"3802.96"],["609460","86.27"]],"lastTradePrice":"610360",' + '"lastUpdate":1727679776413}' + ) + result_bytes = apply_delta(source2_str.encode("utf-8"), delta2_str.encode("utf-8")) + self.assertEqual(result_bytes.decode("utf-8"), out2) + + # Test 3 + source3_str = ( + '{"asks":[["610350","3422.1"],["610380","1743.7"],["610400","5133.73"],' + '["610410","2690.87"],["610420","100"],["610450","610.86"],["610500",' + '"815.43"],["610690","25"],["610700","120.8"],["610920","524.38"],' + '["610930","305.51"],["611000","937.44"],["611060","8.18"],["611130",' + '"69.91"],["611140","503"],["611150","601.79"],["611190","15"],["611200",' + '"1128.36"],["611250","2153.73"],["611330","500"],["611360","300"],' + '["611400","21.5"],["611500","637.95"],["611530","2"]],"bids":[["610320",' + '"114.61"],["610300","491.56"],["610290","479"],["610260","474.2"],' + '["610240","427.85"],["610200","183.67"],["610160","585.47"],["610150",' + '"396.31"],["610140","1615.92"],["610120","128.73"],["610100","5571.63"],' + '["610040","6.84"],["610000","15505.56"],["609930","100"],["609900",' + '"46"],["609810","150"],["609750","196.76"],["609730","50.2"],["609700",' + '"411.97"],["609650","1640.12"],["609640","480.23"],["609600","410.04"],' + '["609560","1640.36"],["609530","2.14"]],"lastTradePrice":"610350",' + '"lastUpdate":1727685711521}' + ) + delta3_str = ( + 'Fa\nQ:{"asks":[["610320","312.757@4v,E:0350","3418.837@6x,D:0380",' + '"1743.78@8W,68@r,A:],"bids":[6y@7g,I:,["609520","143.57p@Eb,5:3315}3QQaIf;' + ) + out3 = ( + '{"asks":[["610320","312.75"],["610350","3418.83"],["610380","1743.7"],["610400",' + '"5133.73"],["610410","2690.87"],["610420","100"],["610450","610.86"],["610500",' + '"815.43"],["610690","25"],["610700","120.8"],["610920","524.38"],["610930",' + '"305.51"],["611000","937.44"],["611060","8.18"],["611130","69.91"],["611140",' + '"503"],["611150","601.79"],["611190","15"],["611200","1128.36"],["611250",' + '"2153.73"],["611330","500"],["611360","300"],["611400","21.5"],["611500",' + '"637.95"]],"bids":[["610300","491.56"],["610290","479"],["610260","474.2"],' + '["610240","427.85"],["610200","183.67"],["610160","585.47"],["610150",' + '"396.31"],["610140","1615.92"],["610120","128.73"],["610100","5571.63"],' + '["610040","6.84"],["610000","15505.56"],["609930","100"],["609900","46"],' + '["609810","150"],["609750","196.76"],["609730","50.2"],["609700","411.97"],' + '["609650","1640.12"],["609640","480.23"],["609600","410.04"],["609560",' + '"1640.36"],["609530","2.14"],["609520","143.57"]],"lastTradePrice":"610350",' + '"lastUpdate":1727685713315}' + ) + result_bytes = apply_delta(source3_str.encode("utf-8"), delta3_str.encode("utf-8")) + self.assertEqual(result_bytes.decode("utf-8"), out3) + + # Test 4 + source4_str = ( + '{"asks":[["610390","600.45"],["610400","118.16"],["610410","2450.9"],' + '["610420","100"],["610450","413.91"],["610490","20"],["610500",' + '"865.43"],["610690","25"],["610700","120.8"],["610800","325.49"],' + '["610900","43"],["610930","386.35"],["610980","25"],["610990",' + '"1304.18"],["611000","6729.02"],["611060","8.18"],["611140","105"],' + '["611150","601.79"],["611190","15"],["611200","1118.36"],["611250",' + '"2253.63"],["611330","500"],["611350","200.66"],["611360","300"]],' + '"bids":[["610260","73.51"],["610250","1884.19"],["610240","27.79"],' + '["610230","55.7"],["610100","88.94"],["610060","957.52"],["610040",' + '"48.84"],["610000","7344.08"],["609990","234.11"],["609800","2.1"],' + '["609720","50"],["609670","2583.24"],["609660","50.99"],["609650",' + '"922.5"],["609640","381.02"],["609600","410.04"],["609560","1640.36"],' + '["609530","2.14"],["609520","143.57"],["609510","531.75"],["609500",' + '"5885.67"],["609460","86.27"],["609430","100"],["609400","410.17"]],' + '"lastTradePrice":"610390","lastUpdate":1727688337312}' + ) + delta4_str = "FO\nP@0,74@Q,5:68.377N@7Z,1:5P@Ew,5:9024}9u5zN;" + out4 = ( + '{"asks":[["610390","600.4"],["610400","118.16"],["610410","2450.9"],["610420",' + '"100"],["610450","413.91"],["610490","20"],["610500","865.43"],["610690","25"],' + '["610700","120.8"],["610800","325.49"],["610900","43"],["610930","386.35"],' + '["610980","25"],["610990","1304.18"],["611000","6729.02"],["611060","8.18"],' + '["611140","105"],["611150","601.79"],["611190","15"],["611200","1118.36"],' + '["611250","2253.63"],["611330","500"],["611350","200.66"],["611360","300"]],' + '"bids":[["610260","68.37"],["610250","1884.19"],["610240","27.79"],["610230",' + '"55.7"],["610100","88.94"],["610060","957.52"],["610040","48.84"],["610000",' + '"7344.08"],["609990","234.11"],["609800","2.1"],["609720","50"],["609670",' + '"2583.24"],["609660","50.99"],["609650","922.5"],["609640","381.02"],["609600",' + '"410.04"],["609560","1640.36"],["609530","2.14"],["609520","143.57"],["609510",' + '"531.75"],["609500","5885.67"],["609460","86.27"],["609430","100"],["609400",' + '"410.17"]],"lastTradePrice":"610350","lastUpdate":1727688339024}' + ) + result_bytes = apply_delta(source4_str.encode("utf-8"), delta4_str.encode("utf-8")) + self.assertEqual(result_bytes.decode("utf-8"), out4) + + # Test 5 + # source5 is out4 + delta5_str = "FP\nK@0,6:593.1775@P,4:0.517N@7Y,1:9Q@Ev,4:892}1bjQuR;" + out5 = ( + '{"asks":[["610390","593.17"],["610400","118.16"],["610410","2450.9"],["610420",' + '"100"],["610450","413.91"],["610490","20"],["610500","865.43"],["610690","25"],' + '["610700","120.8"],["610800","325.49"],["610900","43"],["610930","386.35"],' + '["610980","25"],["610990","1304.18"],["611000","6729.02"],["611060","8.18"],' + '["611140","105"],["611150","601.79"],["611190","15"],["611200","1118.36"],' + '["611250","2253.63"],["611330","500"],["611350","200.66"],["611360","300"]],' + '"bids":[["610260","60.51"],["610250","1884.19"],["610240","27.79"],["610230",' + '"55.7"],["610100","88.94"],["610060","957.52"],["610040","48.84"],["610000",' + '"7344.08"],["609990","234.11"],["609800","2.1"],["609720","50"],["609670",' + '"2583.24"],["609660","50.99"],["609650","922.5"],["609640","381.02"],["609600",' + '"410.04"],["609560","1640.36"],["609530","2.14"],["609520","143.57"],["609510",' + '"531.75"],["609500","5885.67"],["609460","86.27"],["609430","100"],["609400",' + '"410.17"]],"lastTradePrice":"610390","lastUpdate":1727688339892}' + ) + result_bytes = apply_delta(out4.encode("utf-8"), delta5_str.encode("utf-8")) + self.assertEqual(result_bytes.decode("utf-8"), out5)