From 09266c918c9abb0aae7a98cea0e5144e4f2fd1f7 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 21 Jan 2025 17:29:31 +0300 Subject: [PATCH] Partition autosplit feature --- ydb/_grpc/grpcwrapper/ydb_topic.py | 24 +++++++++++++++++++++++ ydb/_topic_reader/topic_reader.py | 2 ++ ydb/_topic_reader/topic_reader_asyncio.py | 9 +++++++++ 3 files changed, 35 insertions(+) diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index ec84ab08..f8c45d2e 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -419,12 +419,14 @@ def from_proto( class InitRequest(IToProto): topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"] consumer: str + auto_partitioning_support: bool = False def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest: res = ydb_topic_pb2.StreamReadMessage.InitRequest() res.consumer = self.consumer for settings in self.topics_read_settings: res.topics_read_settings.append(settings.to_proto()) + res.auto_partitioning_support = self.auto_partitioning_support return res @dataclass @@ -696,6 +698,20 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRespon partition_session_id=self.partition_session_id, ) + @dataclass + class EndPartitionSession(IFromProto): + partition_session_id: int + adjacent_partition_ids: List[int] + child_partition_ids: List[int] + + @staticmethod + def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession): + return StreamReadMessage.EndPartitionSession( + partition_session_id=msg.partition_session_id, + adjacent_partition_ids=list(msg.adjacent_partition_ids), + child_partition_ids=list(msg.child_partition_ids), + ) + @dataclass class FromClient(IToProto): client_message: "ReaderMessagesFromClientToServer" @@ -775,6 +791,13 @@ def from_proto( msg.partition_session_status_response ), ) + elif mess_type == "end_partition_session": + return StreamReadMessage.FromServer( + server_status=server_status, + server_message=StreamReadMessage.EndPartitionSession.from_proto( + msg.end_partition_session, + ), + ) else: raise issues.UnexpectedGrpcMessage( "Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type @@ -799,6 +822,7 @@ def from_proto( UpdateTokenResponse, StreamReadMessage.StartPartitionSessionRequest, StreamReadMessage.StopPartitionSessionRequest, + StreamReadMessage.EndPartitionSession, ] diff --git a/ydb/_topic_reader/topic_reader.py b/ydb/_topic_reader/topic_reader.py index b907ee27..699e2417 100644 --- a/ydb/_topic_reader/topic_reader.py +++ b/ydb/_topic_reader/topic_reader.py @@ -45,6 +45,7 @@ class PublicReaderSettings: consumer: str topic: TopicSelectorTypes buffer_size_bytes: int = 50 * 1024 * 1024 + auto_partitioning_support: bool = False decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None """decoders: map[codec_code] func(encoded_bytes)->decoded_bytes""" @@ -77,6 +78,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest: return StreamReadMessage.InitRequest( topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore consumer=self.consumer, + auto_partitioning_support=self.auto_partitioning_support, ) def _retry_settings(self) -> RetrySettings: diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e407fe01..6545a451 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -498,6 +498,12 @@ async def _read_messages_loop(self): ): self._on_partition_session_stop(message.server_message) + elif isinstance( + message.server_message, + StreamReadMessage.EndPartitionSession, + ): + self._on_end_partition_session(message.server_message) + elif isinstance(message.server_message, UpdateTokenResponse): self._update_token_event.set() @@ -575,6 +581,9 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes ) ) + def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession): + logger.debug(f"End partition session with id: {message.partition_session_id}") + def _on_read_response(self, message: StreamReadMessage.ReadResponse): self._buffer_consume_bytes(message.bytes_size)