Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add delta compression support #20

Merged
merged 1 commit into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repos:
- id: "check-merge-conflict"
- id: "debug-statements"
- id: "end-of-file-fixer"
exclude: "tests/data/.*"
- id: "mixed-line-ending"
- id: "detect-private-key"
- id: "check-yaml"
Expand Down
24 changes: 24 additions & 0 deletions centrifuge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Union,
List,
Callable,
Literal,
)

import websockets
Expand Down Expand Up @@ -201,6 +202,7 @@ def new_subscription(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Literal["fossil", ""] = "",
) -> "Subscription":
"""Creates new subscription to channel. If subscription already exists then
DuplicateSubscriptionError exception will be raised.
Expand All @@ -221,6 +223,7 @@ def new_subscription(
positioned=positioned,
recoverable=recoverable,
join_leave=join_leave,
delta=delta,
)
self._subs[channel] = sub
return sub
Expand Down Expand Up @@ -782,6 +785,9 @@ def _construct_subscribe_command(self, sub: "Subscription", cmd_id: int) -> Dict
subscribe["epoch"] = sub._epoch
subscribe["offset"] = sub._offset

if sub._delta:
subscribe["delta"] = sub._delta

command = {
"id": cmd_id,
"subscribe": subscribe,
Expand Down Expand Up @@ -1311,6 +1317,7 @@ def _publication_from_proto(self, pub: Any) -> Publication:
data=self._decode_data(pub.get("data")),
info=client_info,
tags=pub.get("tags", {}),
delta=pub.get("delta", False),
)


Expand Down Expand Up @@ -1352,6 +1359,7 @@ def _initialize(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Literal["fossil", ""] = "",
) -> None:
"""Initializes Subscription instance.
Note: use Client.new_subscription method to create new subscriptions in your app.
Expand All @@ -1376,6 +1384,12 @@ def _initialize(
self._recover: bool = False
self._offset: int = 0
self._epoch: str = ""
self._prev_data: Optional[Any] = None

if delta not in {"fossil", ""}:
raise CentrifugeError("unsupported delta format")
self._delta = delta
self._delta_negotiated: bool = False

@classmethod
def _create_instance(cls, *args: Any, **kwargs: Any) -> "Subscription":
Expand Down Expand Up @@ -1552,6 +1566,8 @@ async def _move_subscribed(self, subscribe: Dict[str, Any]) -> None:
lambda: asyncio.ensure_future(self._refresh(), loop=self._client._loop),
)

self._delta_negotiated = subscribe.get("delta", False)

await on_subscribed_handler(
SubscribedContext(
channel=self.channel,
Expand Down Expand Up @@ -1603,6 +1619,14 @@ async def _resubscribe(self) -> None:

async def _process_publication(self, pub: Any) -> None:
publication = self._client._publication_from_proto(pub)

if self._delta and self._delta_negotiated:
new_data, prev_data = self._client._codec.apply_delta_if_needed(
self._prev_data, publication
)
publication.data = new_data
self._prev_data = prev_data

await self.events.on_publication(PublicationContext(pub=publication))
if publication.offset > 0:
self._offset = publication.offset
Expand Down
26 changes: 25 additions & 1 deletion centrifuge/codecs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json
from typing import Union, Iterable, AsyncIterable
from typing import Union, Iterable, AsyncIterable, TYPE_CHECKING

from google.protobuf.json_format import MessageToDict, ParseDict
from websockets.typing import Data

import centrifuge.protocol.client_pb2 as protocol
from centrifuge.fossil import apply_delta

if TYPE_CHECKING:
from centrifuge import Publication


class _JsonCodec:
Expand All @@ -18,6 +22,16 @@ def encode_commands(commands):
def decode_replies(data):
return [json.loads(reply) for reply in data.strip().split("\n")]

@staticmethod
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data.encode("utf-8"))
new_data = prev_data.decode("utf-8")
else:
prev_data = pub.data.encode("utf-8")
new_data = pub.data
return new_data, prev_data


def _varint_encode(number):
"""Encode an integer as a varint."""
Expand Down Expand Up @@ -73,3 +87,13 @@ def decode_replies(data: bytes):
reply.ParseFromString(message_bytes)
replies.append(MessageToDict(reply, preserving_proto_field_name=True))
return replies

@staticmethod
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data)
new_data = prev_data.decode("utf-8")
else:
prev_data = pub.data
new_data = pub.data.decode("utf-8")
return new_data, prev_data
Loading