Skip to content

Commit

Permalink
Partition autosplit feature
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Jan 21, 2025
1 parent 228bb52 commit 181fb89
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
22 changes: 22 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,20 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRespon
partition_session_id=self.partition_session_id,
)

@dataclass
class EndPartitionSession(IFromProto):
partition_session_id: int
adjacent_partition_ids: List[int]
child_partition_ids: List[int]

@staticmethod
def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession):
return StreamReadMessage.EndPartitionSession(
partition_session_id=msg.partition_session_id,
adjacent_partition_ids=list(msg.adjacent_partition_ids),
child_partition_ids=list(msg.child_partition_ids),
)

@dataclass
class FromClient(IToProto):
client_message: "ReaderMessagesFromClientToServer"
Expand Down Expand Up @@ -775,6 +789,13 @@ def from_proto(
msg.partition_session_status_response
),
)
elif mess_type == "end_partition_session":
return StreamReadMessage.FromServer(
server_status=server_status,
server_message=StreamReadMessage.EndPartitionSession.from_proto(
msg.end_partition_session,
)
)
else:
raise issues.UnexpectedGrpcMessage(
"Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type
Expand All @@ -799,6 +820,7 @@ def from_proto(
UpdateTokenResponse,
StreamReadMessage.StartPartitionSessionRequest,
StreamReadMessage.StopPartitionSessionRequest,
StreamReadMessage.EndPartitionSession,
]


Expand Down
9 changes: 9 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ async def _read_messages_loop(self):
):
self._on_partition_session_stop(message.server_message)

elif isinstance(
message.server_message,
StreamReadMessage.EndPartitionSession,
):
self._on_end_partition_session(message.server_message)

elif isinstance(message.server_message, UpdateTokenResponse):
self._update_token_event.set()

Expand Down Expand Up @@ -575,6 +581,9 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes
)
)

def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession):
logger.debug(f"End partition session with id: {message.partition_session_id}")

def _on_read_response(self, message: StreamReadMessage.ReadResponse):
self._buffer_consume_bytes(message.bytes_size)

Expand Down

0 comments on commit 181fb89

Please sign in to comment.