Skip to content

Commit

Permalink
feat: Add delta compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
itismoej committed Sep 30, 2024
1 parent 600d072 commit 40b62f3
Show file tree
Hide file tree
Showing 8 changed files with 593 additions and 77 deletions.
24 changes: 24 additions & 0 deletions centrifuge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Union,
List,
Callable,
Literal,
)

import websockets
Expand Down Expand Up @@ -201,6 +202,7 @@ def new_subscription(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Literal["fossil", ""] = "",
) -> "Subscription":
"""Creates new subscription to channel. If subscription already exists then
DuplicateSubscriptionError exception will be raised.
Expand All @@ -221,6 +223,7 @@ def new_subscription(
positioned=positioned,
recoverable=recoverable,
join_leave=join_leave,
delta=delta,
)
self._subs[channel] = sub
return sub
Expand Down Expand Up @@ -782,6 +785,9 @@ def _construct_subscribe_command(self, sub: "Subscription", cmd_id: int) -> Dict
subscribe["epoch"] = sub._epoch
subscribe["offset"] = sub._offset

if sub._delta:
subscribe["delta"] = sub._delta

command = {
"id": cmd_id,
"subscribe": subscribe,
Expand Down Expand Up @@ -1311,6 +1317,7 @@ def _publication_from_proto(self, pub: Any) -> Publication:
data=self._decode_data(pub.get("data")),
info=client_info,
tags=pub.get("tags", {}),
delta=pub.get("delta", False),
)


Expand Down Expand Up @@ -1352,6 +1359,7 @@ def _initialize(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Literal["fossil", ""] = "",
) -> None:
"""Initializes Subscription instance.
Note: use Client.new_subscription method to create new subscriptions in your app.
Expand All @@ -1376,6 +1384,12 @@ def _initialize(
self._recover: bool = False
self._offset: int = 0
self._epoch: str = ""
self._prev_data: Optional[Any] = None

if delta not in {"fossil", ""}:
raise CentrifugeError("unsupported delta format")
self._delta = delta
self._delta_negotiated: bool = False

@classmethod
def _create_instance(cls, *args: Any, **kwargs: Any) -> "Subscription":
Expand Down Expand Up @@ -1552,6 +1566,8 @@ async def _move_subscribed(self, subscribe: Dict[str, Any]) -> None:
lambda: asyncio.ensure_future(self._refresh(), loop=self._client._loop),
)

self._delta_negotiated = subscribe.get("delta", False)

await on_subscribed_handler(
SubscribedContext(
channel=self.channel,
Expand Down Expand Up @@ -1603,6 +1619,14 @@ async def _resubscribe(self) -> None:

async def _process_publication(self, pub: Any) -> None:
publication = self._client._publication_from_proto(pub)

if self._delta and self._delta_negotiated:
new_data, prev_data = self._client._codec.apply_delta_if_needed(
self._prev_data, publication
)
publication.data = new_data
self._prev_data = prev_data

await self.events.on_publication(PublicationContext(pub=publication))
if publication.offset > 0:
self._offset = publication.offset
Expand Down
26 changes: 25 additions & 1 deletion centrifuge/codecs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json
from typing import Union, Iterable, AsyncIterable
from typing import Union, Iterable, AsyncIterable, TYPE_CHECKING

from google.protobuf.json_format import MessageToDict, ParseDict
from websockets.typing import Data

import centrifuge.protocol.client_pb2 as protocol
from centrifuge.fossil import apply_delta

if TYPE_CHECKING:
from centrifuge import Publication


class _JsonCodec:
Expand All @@ -18,6 +22,16 @@ def encode_commands(commands):
def decode_replies(data):
return [json.loads(reply) for reply in data.strip().split("\n")]

@staticmethod
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data.encode("utf-8"))
new_data = prev_data.decode("utf-8")
else:
prev_data = pub.data.encode("utf-8")
new_data = pub.data
return new_data, prev_data


def _varint_encode(number):
"""Encode an integer as a varint."""
Expand Down Expand Up @@ -73,3 +87,13 @@ def decode_replies(data: bytes):
reply.ParseFromString(message_bytes)
replies.append(MessageToDict(reply, preserving_proto_field_name=True))
return replies

@staticmethod
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data)
new_data = prev_data.decode("utf-8")
else:
prev_data = pub.data
new_data = pub.data.decode("utf-8")
return new_data, prev_data
Loading

0 comments on commit 40b62f3

Please sign in to comment.