Skip to content

Commit

Permalink
Stop rotation batch queue after split
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Feb 5, 2025
1 parent f68af4c commit 0d5fa7b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
11 changes: 11 additions & 0 deletions ydb/_topic_reader/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ def close(self):
def closed(self):
return self.state == PartitionSession.State.Stopped

def end(self):
if self.closed:
return

self.state = PartitionSession.State.Ended

@property
def ended(self):
return self.state == PartitionSession.State.Ended

def _ensure_not_closed(self):
if self.state == PartitionSession.State.Stopped:
raise topic_reader_asyncio.PublicTopicReaderPartitionExpiredError()
Expand All @@ -129,6 +139,7 @@ class State(enum.Enum):
Active = 1
GracefulShutdown = 2
Stopped = 3
Ended = 4

@dataclass(order=True)
class CommitAckWaiter:
Expand Down
27 changes: 18 additions & 9 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
self._started = True
self._stream = stream

print(init_message)

stream.write(StreamReadMessage.FromClient(client_message=init_message))
init_response = await stream.receive() # type: StreamReadMessage.FromServer
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
Expand Down Expand Up @@ -390,6 +388,15 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
partition_session_id, batch = self._message_batches.popitem(last=False)
return partition_session_id, batch

def _return_batch_to_queue(self, part_sess_id: int, batch: datatypes.PublicBatch):
self._message_batches[part_sess_id] = batch

# In case of auto-split we should return all parent messages ASAP
# without queue rotation to prevent child's messages before parent's.
if part_sess_id in self._partition_sessions and self._partition_sessions[part_sess_id].ended:
print(f"part_sess_id: {part_sess_id} is ended, return to beginning of queue")
self._message_batches.move_to_end(part_sess_id, last=False)

def receive_batch_nowait(self, max_messages: Optional[int] = None):
if self._get_first_error():
raise self._get_first_error()
Expand All @@ -405,7 +412,8 @@ def receive_batch_nowait(self, max_messages: Optional[int] = None):

cutted_batch = batch._pop_batch(message_count=max_messages)

self._message_batches[part_sess_id] = batch
self._return_batch_to_queue(part_sess_id, batch)

self._buffer_release_bytes(cutted_batch._bytes_size)

return cutted_batch
Expand All @@ -425,7 +433,7 @@ def receive_message_nowait(self):
self._buffer_release_bytes(batch._bytes_size)
else:
# TODO: we should somehow release bytes from single message as well
self._message_batches[part_sess_id] = batch
self._return_batch_to_queue(part_sess_id, batch)

return message

Expand Down Expand Up @@ -584,13 +592,14 @@ def _on_partition_session_stop(self, message: StreamReadMessage.StopPartitionSes
)

def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSession):
logger.info(
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
logger.debug(
f"End partition session with id: {message.partition_session_id}, "
f"child partitions: {message.child_partition_ids}"
)

print(
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
)
if message.partition_session_id in self._partition_sessions:
# Mark partition session as ended not to shuffle messages.
self._partition_sessions[message.partition_session_id].end()

def _on_read_response(self, message: StreamReadMessage.ReadResponse):
self._buffer_consume_bytes(message.bytes_size)
Expand Down

0 comments on commit 0d5fa7b

Please sign in to comment.