From ab8fb032e3bf639cdabb5307829d71fb8336f09c Mon Sep 17 00:00:00 2001 From: Kirill Kurdyukov Date: Thu, 6 Feb 2025 19:46:32 +0300 Subject: [PATCH] feat: supported Commit() & ReadAsync() in TopicReader (#265) --- src/Ydb.Sdk/src/Services/Topic/Exceptions.cs | 7 +- src/Ydb.Sdk/src/Services/Topic/IReader.cs | 2 +- .../Topic/Reader/InternalBatchMessages.cs | 111 ++++++ .../Services/Topic/Reader/InternalMessage.cs | 91 ----- .../src/Services/Topic/Reader/Message.cs | 62 ++- .../Services/Topic/Reader/PartitionSession.cs | 89 +++++ .../src/Services/Topic/Reader/Reader.cs | 363 +++++++----------- .../src/Services/Topic/Reader/Utils.cs | 15 + .../tests/Topic/ReaderIntegrationTests.cs | 39 +- src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs | 119 ++++++ 10 files changed, 525 insertions(+), 373 deletions(-) create mode 100644 src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs delete mode 100644 src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs create mode 100644 src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs create mode 100644 src/Ydb.Sdk/src/Services/Topic/Reader/Utils.cs create mode 100644 src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index 7e72ed45..4a99c99e 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -19,18 +19,13 @@ public class ReaderException : Exception { public ReaderException(string message) : base(message) { - Status = new Status(StatusCode.Unspecified); } public ReaderException(string message, Status status) : base(message + ": " + status) { - Status = status; } - public ReaderException(string message, Driver.TransportException e) : base(message, e) + public ReaderException(string message, Exception inner) : base(message, inner) { - Status = e.Status; } - - public Status Status { get; } } diff --git a/src/Ydb.Sdk/src/Services/Topic/IReader.cs b/src/Ydb.Sdk/src/Services/Topic/IReader.cs index 9a09cd4a..182fe4b7 100644 --- a/src/Ydb.Sdk/src/Services/Topic/IReader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/IReader.cs @@ -6,5 +6,5 @@ public interface IReader : IDisposable { public ValueTask> ReadAsync(CancellationToken cancellationToken = default); - public ValueTask> ReadBatchAsync(CancellationToken cancellationToken = default); + public ValueTask> ReadBatchAsync(CancellationToken cancellationToken = default); } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs new file mode 100644 index 00000000..f7651197 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs @@ -0,0 +1,111 @@ +using System.Collections.Immutable; +using System.Diagnostics.CodeAnalysis; +using Ydb.Topic; + +namespace Ydb.Sdk.Services.Topic.Reader; + +internal class InternalBatchMessages +{ + private readonly StreamReadMessage.Types.ReadResponse.Types.Batch _batch; + private readonly PartitionSession _partitionSession; + private readonly IDeserializer _deserializer; + private readonly ReaderSession _readerSession; + private readonly long _approximatelyBatchSize; + + private int _startMessageDataIndex; + + private int OriginalMessageCount => _batch.MessageData.Count; + private bool IsActive => _startMessageDataIndex < OriginalMessageCount && _readerSession.IsActive; + + public InternalBatchMessages( + StreamReadMessage.Types.ReadResponse.Types.Batch batch, + PartitionSession partitionsSession, + ReaderSession readerSession, + long approximatelyBatchSize, + IDeserializer deserializer) + { + _batch = batch; + _partitionSession = partitionsSession; + _readerSession = readerSession; + _deserializer = deserializer; + _approximatelyBatchSize = approximatelyBatchSize; + } + + internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message message) + { + if (!IsActive) + { + message = default; + return false; + } + + var index = _startMessageDataIndex++; + var approximatelyMessageBytesSize = Utils + .CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index); + var messageData = _batch.MessageData[index]; + + TValue value; + try + { + value = _deserializer.Deserialize(messageData.Data.ToByteArray()); + } + catch (Exception e) + { + throw new ReaderException("Error when deserializing message data", e); + } + + _readerSession.TryReadRequestBytes(approximatelyMessageBytesSize); + var nextCommitedOffset = messageData.Offset + 1; + + message = new Message( + data: value, + topic: _partitionSession.TopicPath, + partitionId: _partitionSession.PartitionId, + partitionSessionId: _partitionSession.PartitionSessionId, + producerId: _batch.ProducerId, + createdAt: messageData.CreatedAt.ToDateTime(), + metadata: messageData.MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())) + .ToImmutableArray(), + offsetsRange: new OffsetsRange + { Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset }, + readerSession: _readerSession + ); + _partitionSession.PrevEndOffsetMessage = nextCommitedOffset; + + return true; + } + + internal bool TryPublicBatch([MaybeNullWhen(false)] out BatchMessages batchMessages) + { + if (!IsActive) + { + batchMessages = default; + return false; + } + + var nextCommitedOffset = _batch.MessageData.Last().Offset + 1; + var offsetsRangeBatch = new OffsetsRange + { Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset }; + _partitionSession.PrevEndOffsetMessage = nextCommitedOffset; + + var messages = new List>(); + while (TryDequeueMessage(out var message)) + { + messages.Add(message); + } + + batchMessages = new BatchMessages( + batch: messages, + readerSession: _readerSession, + offsetsRange: offsetsRangeBatch, + partitionSessionId: _partitionSession.PartitionSessionId + ); + + return true; + } +} + +internal record CommitSending( + OffsetsRange OffsetsRange, + TaskCompletionSource TcsCommit +); diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs deleted file mode 100644 index d913c301..00000000 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs +++ /dev/null @@ -1,91 +0,0 @@ -using System.Collections.Immutable; -using Google.Protobuf; -using Google.Protobuf.Collections; -using Google.Protobuf.WellKnownTypes; -using Ydb.Topic; - -namespace Ydb.Sdk.Services.Topic.Reader; - -internal class InternalMessage -{ - public InternalMessage( - ByteString data, - string topic, - long partitionId, - string producerId, - OffsetsRange offsetsRange, - Timestamp createdAt, - RepeatedField metadataItems, - long approximatelyBytesSize) - { - Data = data; - Topic = topic; - PartitionId = partitionId; - ProducerId = producerId; - OffsetsRange = offsetsRange; - CreatedAt = createdAt; - MetadataItems = metadataItems; - ApproximatelyBytesSize = approximatelyBytesSize; - } - - private ByteString Data { get; } - - private string Topic { get; } - - private long PartitionId { get; } - - private string ProducerId { get; } - - private OffsetsRange OffsetsRange { get; } - - private Timestamp CreatedAt { get; } - - private RepeatedField MetadataItems { get; } - - private long ApproximatelyBytesSize { get; } - - internal Message ToPublicMessage(IDeserializer deserializer, ReaderSession readerSession) - { - return new Message( - data: deserializer.Deserialize(Data.ToByteArray()), - topic: Topic, - partitionId: PartitionId, - producerId: ProducerId, - createdAt: CreatedAt.ToDateTime(), - metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(), - offsetsRange: OffsetsRange, - readerSession: readerSession, - approximatelyBytesSize: ApproximatelyBytesSize - ); - } -} - -internal class InternalBatchMessage -{ - public InternalBatchMessage( - OffsetsRange batchOffsetsRange, - Queue internalMessages, - ReaderSession readerSession, - long approximatelyBatchSize) - { - BatchOffsetsRange = batchOffsetsRange; - InternalMessages = internalMessages; - ReaderSession = readerSession; - ApproximatelyBatchSize = approximatelyBatchSize; - } - - internal OffsetsRange BatchOffsetsRange { get; } - - internal Queue InternalMessages { get; } - - internal ReaderSession ReaderSession { get; } - - internal long ApproximatelyBatchSize { get; } -} - -internal record CommitSending( - OffsetsRange OffsetsRange, - long PartitionSessionId, - TaskCompletionSource TcsCommit, - long ApproximatelyBytesSize -); diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs index acabf347..6be60dce 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs @@ -5,20 +5,20 @@ namespace Ydb.Sdk.Services.Topic.Reader; public class Message { + private readonly long _partitionSessionId; private readonly OffsetsRange _offsetsRange; - private readonly ReaderSession _readerSession; - private readonly long _approximatelyBytesSize; + private readonly ReaderSession _readerSession; internal Message( TValue data, string topic, long partitionId, + long partitionSessionId, string producerId, DateTime createdAt, ImmutableArray metadata, OffsetsRange offsetsRange, - ReaderSession readerSession, - long approximatelyBytesSize) + ReaderSession readerSession) { Data = data; Topic = topic; @@ -27,9 +27,9 @@ internal Message( CreatedAt = createdAt; Metadata = metadata; + _partitionSessionId = partitionSessionId; _offsetsRange = offsetsRange; _readerSession = readerSession; - _approximatelyBytesSize = approximatelyBytesSize; } public TValue Data { get; } @@ -45,56 +45,38 @@ internal Message( public DateTime CreatedAt { get; } - public ImmutableArray Metadata { get; } - - internal long Start => _offsetsRange.Start; - internal long End => _offsetsRange.End; + public IReadOnlyCollection Metadata { get; } public Task CommitAsync() { - return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId, _approximatelyBytesSize); + return _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId); } } -public class BatchMessage +public class BatchMessages { - private readonly ReaderSession _readerSession; - private readonly long _approximatelyBatchSize; + private readonly ReaderSession _readerSession; + private readonly OffsetsRange _offsetsRange; + private readonly long _partitionSessionId; - public ImmutableArray> Batch { get; } + public IReadOnlyCollection> Batch { get; } - internal BatchMessage( - ImmutableArray> batch, - ReaderSession readerSession, - long approximatelyBatchSize) + internal BatchMessages( + IReadOnlyCollection> batch, + ReaderSession readerSession, + OffsetsRange offsetsRange, + long partitionSessionId) { Batch = batch; _readerSession = readerSession; - _approximatelyBatchSize = approximatelyBatchSize; + _offsetsRange = offsetsRange; + _partitionSessionId = partitionSessionId; } public Task CommitBatchAsync() { - if (Batch.Length == 0) - { - return Task.CompletedTask; - } - - var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End }; - - return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId, _approximatelyBatchSize); + return Batch.Count == 0 + ? Task.CompletedTask + : _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId); } } - -public class TopicPartitionOffset -{ - public TopicPartitionOffset(long offset, long partitionId) - { - Offset = offset; - PartitionId = partitionId; - } - - public long Offset { get; } - - public long PartitionId { get; } -} diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs new file mode 100644 index 00000000..5a3788e1 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs @@ -0,0 +1,89 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; + +namespace Ydb.Sdk.Services.Topic.Reader; + +internal class PartitionSession +{ + private readonly ILogger _logger; + private readonly ConcurrentQueue _waitCommitMessages = new(); + + private volatile bool _isStopped; + + public PartitionSession( + ILogger logger, + long partitionSessionId, + string topicPath, + long partitionId, + long commitedOffset) + { + _logger = logger; + PartitionSessionId = partitionSessionId; + TopicPath = topicPath; + PartitionId = partitionId; + PrevEndOffsetMessage = commitedOffset; + CommitedOffset = commitedOffset; + } + + // Identifier of partition session. Unique inside one RPC call. + internal long PartitionSessionId { get; } + + // Topic path of partition + internal string TopicPath { get; } + + // Partition identifier + internal long PartitionId { get; } + + internal long PrevEndOffsetMessage { get; set; } + + // Each offset up to and including (committed_offset - 1) was fully processed. + internal long CommitedOffset { get; private set; } + + internal void RegisterCommitRequest(CommitSending commitSending) + { + var endOffset = commitSending.OffsetsRange.End; + + if (endOffset < CommitedOffset) + { + commitSending.TcsCommit.SetResult(); + } + else + { + _waitCommitMessages.Enqueue(commitSending); + + if (_isStopped) + { + Utils.SetPartitionClosedException(commitSending, PartitionSessionId); + } + } + } + + internal void HandleCommitedOffset(long commitedOffset) + { + if (CommitedOffset >= commitedOffset) + { + _logger.LogError( + "PartitionSession[{PartitionSessionId}] received CommitOffsetResponse[CommitedOffset={CommitedOffset}] " + + "which is not greater than previous committed offset: {PrevCommitedOffset}", + PartitionSessionId, commitedOffset, CommitedOffset); + } + + CommitedOffset = commitedOffset; + + while (_waitCommitMessages.TryPeek(out var waitCommitTcs) && + waitCommitTcs.OffsetsRange.End <= commitedOffset) + { + _waitCommitMessages.TryDequeue(out _); + waitCommitTcs.TcsCommit.SetResult(); + } + } + + internal void Stop() + { + _isStopped = true; + while (_waitCommitMessages.TryDequeue(out var commitSending)) + { + Utils.SetPartitionClosedException(commitSending, PartitionSessionId); + } + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs index d4cff08b..5793bb05 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -1,5 +1,4 @@ using System.Collections.Concurrent; -using System.Collections.Immutable; using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; @@ -25,8 +24,8 @@ internal class Reader : IReader private readonly ILogger _logger; private readonly GrpcRequestSettings _readerGrpcRequestSettings; - private readonly Channel _receivedMessagesChannel = - Channel.CreateUnbounded( + private readonly Channel> _receivedMessagesChannel = + Channel.CreateUnbounded>( new UnboundedChannelOptions { SingleReader = true, @@ -54,14 +53,9 @@ public async ValueTask> ReadAsync(CancellationToken cancellation { if (_receivedMessagesChannel.Reader.TryPeek(out var batchInternalMessage)) { - if (!batchInternalMessage.ReaderSession.IsActive) + if (batchInternalMessage.TryDequeueMessage(out var message)) { - continue; - } - - if (batchInternalMessage.InternalMessages.TryDequeue(out var message)) - { - return message.ToPublicMessage(_deserializer, batchInternalMessage.ReaderSession); + return message; } if (!_receivedMessagesChannel.Reader.TryRead(out _)) @@ -75,10 +69,10 @@ public async ValueTask> ReadAsync(CancellationToken cancellation } } - throw new ReaderException("Reader is disposed"); + throw new ObjectDisposedException("Reader"); } - public async ValueTask> ReadBatchAsync(CancellationToken cancellationToken = default) + public async ValueTask> ReadBatchAsync(CancellationToken cancellationToken = default) { while (await _receivedMessagesChannel.Reader.WaitToReadAsync(cancellationToken)) { @@ -87,21 +81,13 @@ public async ValueTask> ReadBatchAsync(CancellationToken ca throw new ReaderException("Detect race condition on ReadBatchAsync operation"); } - if (batchInternalMessage.InternalMessages.Count == 0 || !batchInternalMessage.ReaderSession.IsActive) + if (batchInternalMessage.TryPublicBatch(out var batch)) { - continue; + return batch; } - - return new BatchMessage( - batchInternalMessage.InternalMessages - .Select(message => message.ToPublicMessage(_deserializer, batchInternalMessage.ReaderSession)) - .ToImmutableArray(), - batchInternalMessage.ReaderSession, - batchInternalMessage.ApproximatelyBatchSize - ); } - throw new ReaderException("Reader is disposed"); + throw new ObjectDisposedException("Reader"); } private async Task Initialize() @@ -195,13 +181,14 @@ await stream.Write(new MessageFromClient ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = _config.MemoryUsageMaxBytes } }); - new ReaderSession( + new ReaderSession( _config, stream, initResponse.SessionId, Initialize, _logger, - _receivedMessagesChannel.Writer + _receivedMessagesChannel.Writer, + _deserializer ).RunProcessingTopic(); } catch (Driver.TransportException e) @@ -246,26 +233,28 @@ public void Dispose() /// 5) Let's assume client somehow processes it, and its 200 bytes buffer is free again. /// It should account for excess 10 bytes and send ReadRequest with bytes_size = 210. /// -internal class ReaderSession : TopicSession +internal class ReaderSession : TopicSession { private const double FreeBufferCoefficient = 0.2; private readonly ReaderConfig _readerConfig; - private readonly ChannelWriter _channelWriter; + private readonly ChannelWriter> _channelWriter; private readonly CancellationTokenSource _lifecycleReaderSessionCts = new(); + private readonly IDeserializer _deserializer; - private readonly Channel _channelCommitSending = Channel.CreateUnbounded( - new UnboundedChannelOptions - { - SingleReader = true, - SingleWriter = true, - AllowSynchronousContinuations = false - } - ); + private readonly Channel _channelFromClientMessageSending = + Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleWriter = true, + SingleReader = true, + AllowSynchronousContinuations = false + } + ); private readonly ConcurrentDictionary _partitionSessions = new(); - private long _memoryUsageMaxBytes; + private long _readRequestBytes; public ReaderSession( ReaderConfig config, @@ -273,7 +262,8 @@ public ReaderSession( string sessionId, Func initialize, ILogger logger, - ChannelWriter channelWriter + ChannelWriter> channelWriter, + IDeserializer deserializer ) : base( stream, logger, @@ -283,7 +273,7 @@ ChannelWriter channelWriter { _readerConfig = config; _channelWriter = channelWriter; - _memoryUsageMaxBytes = config.MemoryUsageMaxBytes; + _deserializer = deserializer; } public async void RunProcessingTopic() @@ -292,38 +282,9 @@ public async void RunProcessingTopic() { try { - await foreach (var commitSending in _channelCommitSending.Reader.ReadAllAsync()) + await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync()) { - if (_partitionSessions.TryGetValue(commitSending.PartitionSessionId, out var partitionSession)) - { - partitionSession.RegisterCommitRequest(commitSending); - } - else - { - Logger.LogWarning( - "Offset range [{OffsetRange}] is requested to be committed, " + - "but PartitionSession[PartitionSessionId={PartitionSessionId}] is already closed", - commitSending.OffsetsRange, commitSending.PartitionSessionId); - - commitSending.TcsCommit.TrySetException(new ReaderException("AD")); - - continue; - } - - await Stream.Write(new MessageFromClient - { - CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest - { - CommitOffsets = - { - new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset - { - Offsets = { commitSending.OffsetsRange }, - PartitionSessionId = commitSending.PartitionSessionId - } - } - } - }); + await Stream.Write(messageFromClient); } } catch (Driver.TransportException e) @@ -340,24 +301,33 @@ await Stream.Write(new MessageFromClient { while (await Stream.MoveNextAsync()) { - var freeBytesSize = 0; + var messageFromServer = Stream.Current; + var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); - switch (Stream.Current.ServerMessageCase) + if (status.IsNotSuccess) + { + Logger.LogError( + "ReaderSession[{SessionId}] received unsuccessful status while processing readAck: {Status}", + SessionId, status); + return; + } + + switch (messageFromServer.ServerMessageCase) { case ServerMessageOneofCase.ReadResponse: - await HandleReadResponse(); + await HandleReadResponse(messageFromServer.ReadResponse); break; case ServerMessageOneofCase.StartPartitionSessionRequest: - await HandleStartPartitionSessionRequest(); + await HandleStartPartitionSessionRequest(messageFromServer.StartPartitionSessionRequest); break; case ServerMessageOneofCase.CommitOffsetResponse: - freeBytesSize += HandleCommitOffsetResponse(); + HandleCommitOffsetResponse(messageFromServer.CommitOffsetResponse); break; case ServerMessageOneofCase.PartitionSessionStatusResponse: case ServerMessageOneofCase.UpdateTokenResponse: case ServerMessageOneofCase.StopPartitionSessionRequest: - freeBytesSize += StopPartitionSessionRequest(); + await StopPartitionSessionRequest(messageFromServer.StopPartitionSessionRequest); break; case ServerMessageOneofCase.InitResponse: case ServerMessageOneofCase.None: @@ -365,13 +335,6 @@ await Stream.Write(new MessageFromClient default: throw new ArgumentOutOfRangeException(); } - - if (freeBytesSize >= FreeBufferCoefficient * _readerConfig.MemoryUsageMaxBytes) - { - // await Stream.Write(); - - Interlocked.Add(ref _memoryUsageMaxBytes, freeBytesSize); - } } } catch (Driver.TransportException e) @@ -387,9 +350,25 @@ await Stream.Write(new MessageFromClient } } - private async Task HandleStartPartitionSessionRequest() + internal async void TryReadRequestBytes(long bytes) + { + var readRequestBytes = Interlocked.Add(ref _readRequestBytes, bytes); + + if (readRequestBytes < FreeBufferCoefficient * _readerConfig.MemoryUsageMaxBytes) + { + return; + } + + if (Interlocked.CompareExchange(ref _readRequestBytes, 0, readRequestBytes) == readRequestBytes) + { + await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient + { ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = readRequestBytes } }); + } + } + + private async Task HandleStartPartitionSessionRequest( + StreamReadMessage.Types.StartPartitionSessionRequest startPartitionSessionRequest) { - var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest; var partitionSession = startPartitionSessionRequest.PartitionSession; _partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession( Logger, @@ -401,11 +380,11 @@ private async Task HandleStartPartitionSessionRequest() Logger.LogInformation( "ReaderSession[{SessionId}] started PartitionSession[PartitionSessionId={PartitionSessionId}, " + - "Path={Path}, PartitionId={PartitionId}, CommittedOffset={CommittedOffset}]", + "Path=\"{Path}\", PartitionId={PartitionId}, CommittedOffset={CommittedOffset}]", SessionId, partitionSession.PartitionSessionId, partitionSession.Path, partitionSession.PartitionId, startPartitionSessionRequest.CommittedOffset); - await Stream.Write(new MessageFromClient + await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient { StartPartitionSessionResponse = new StreamReadMessage.Types.StartPartitionSessionResponse { @@ -415,14 +394,14 @@ await Stream.Write(new MessageFromClient }); } - private int HandleCommitOffsetResponse() + private void HandleCommitOffsetResponse(StreamReadMessage.Types.CommitOffsetResponse commitOffsetResponse) { - foreach (var partitionsCommittedOffset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets) + foreach (var partitionsCommittedOffset in commitOffsetResponse.PartitionsCommittedOffsets) { if (_partitionSessions.TryGetValue(partitionsCommittedOffset.PartitionSessionId, out var partitionSession)) { - partitionSession.HandleCommitedOffset(partitionSession.CommitedOffset); + partitionSession.HandleCommitedOffset(partitionsCommittedOffset.CommittedOffset); } else { @@ -432,40 +411,75 @@ private int HandleCommitOffsetResponse() partitionsCommittedOffset.CommittedOffset, partitionsCommittedOffset.PartitionSessionId); } } - - throw new NotImplementedException(); } - private int StopPartitionSessionRequest() + private async Task StopPartitionSessionRequest( + StreamReadMessage.Types.StopPartitionSessionRequest stopPartitionSessionRequest) { - throw new NotImplementedException(); + if (_partitionSessions.TryRemove(stopPartitionSessionRequest.PartitionSessionId, out var partitionSession)) + { + if (stopPartitionSessionRequest.Graceful) + { + await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient + { + StopPartitionSessionResponse = new StreamReadMessage.Types.StopPartitionSessionResponse + { PartitionSessionId = partitionSession.PartitionSessionId } + }); + } + + partitionSession.Stop(); + } + else + { + Logger.LogError("Received StopPartitionSessionRequest[PartitionSessionId={}] for unknown PartitionSession", + stopPartitionSessionRequest.PartitionSessionId); + } } - public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionId, long approximatelyBytesSize) + public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionSessionId) { var tcsCommit = new TaskCompletionSource(); - await using var register = _lifecycleReaderSessionCts.Token.Register(() => tcsCommit - .TrySetException(new YdbException($"ReaderSession[{SessionId}] was deactivated"))); - - await _channelCommitSending.Writer.WriteAsync( - new CommitSending( - offsetsRange, - partitionId, - tcsCommit, - approximatelyBytesSize - ) + await using var register = _lifecycleReaderSessionCts.Token.Register( + () => tcsCommit.TrySetException(new YdbException($"ReaderSession[{SessionId}] was deactivated")) ); + var commitSending = new CommitSending(offsetsRange, tcsCommit); + + if (_partitionSessions.TryGetValue(partitionSessionId, out var partitionSession)) + { + partitionSession.RegisterCommitRequest(commitSending); + + await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient + { + CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest + { + CommitOffsets = + { + new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset + { + Offsets = { commitSending.OffsetsRange }, + PartitionSessionId = partitionSessionId + } + } + } + } + ); + } + else + { + Logger.LogWarning("Offset range [{OffsetRange}] is requested to be committed, " + + "but PartitionSession[PartitionSessionId={PartitionSessionId}] is already closed", + commitSending.OffsetsRange, partitionSessionId); + + Utils.SetPartitionClosedException(commitSending, partitionSessionId); + } + await tcsCommit.Task; } - private async Task HandleReadResponse() + private async Task HandleReadResponse(StreamReadMessage.Types.ReadResponse readResponse) { - var readResponse = Stream.Current.ReadResponse; - - Interlocked.Add(ref _memoryUsageMaxBytes, -readResponse.BytesSize); - var bytesSize = readResponse.BytesSize; var partitionCount = readResponse.PartitionData.Count; @@ -473,7 +487,7 @@ private async Task HandleReadResponse() { var partition = readResponse.PartitionData[partitionIndex]; var partitionSessionId = partition.PartitionSessionId; - var approximatelyPartitionBytesSize = CalculateApproximatelyBytesSize( + var approximatelyPartitionBytesSize = Utils.CalculateApproximatelyBytesSize( bytesSize: bytesSize, countParts: partitionCount, currentIndex: partitionIndex @@ -481,54 +495,22 @@ private async Task HandleReadResponse() if (_partitionSessions.TryGetValue(partitionSessionId, out var partitionSession)) { - var startOffsetBatch = partitionSession.CommitedOffset; - var endOffsetBatch = partitionSession.CommitedOffset; - var batchCount = partition.Batches.Count; - var batch = partition.Batches; + var batches = partition.Batches; for (var batchIndex = 0; batchIndex < batchCount; batchIndex++) { - var approximatelyBatchBytesSize = CalculateApproximatelyBytesSize( - bytesSize: approximatelyPartitionBytesSize, - countParts: batchCount, - currentIndex: batchIndex - ); - - var internalBatchMessages = new Queue(); - var messagesCount = batch[batchIndex].MessageData.Count; - - for (var messageIndex = 0; messageIndex < messagesCount; messageIndex++) - { - var messageData = batch[batchIndex].MessageData[messageIndex]; - - internalBatchMessages.Enqueue( - new InternalMessage( - data: messageData.Data, - topic: partitionSession.TopicPath, - partitionId: partitionSession.PartitionId, - producerId: batch[batchIndex].ProducerId, - offsetsRange: new OffsetsRange - { Start = partitionSession.PrevEndOffsetMessage, End = messageData.Offset }, - createdAt: messageData.CreatedAt, - metadataItems: messageData.MetadataItems, - approximatelyBytesSize: CalculateApproximatelyBytesSize( - bytesSize: approximatelyBatchBytesSize, - countParts: messagesCount, - currentIndex: messageIndex - ) - ) - ); - - partitionSession.PrevEndOffsetMessage = endOffsetBatch = messageData.Offset + 1; - } - await _channelWriter.WriteAsync( - new InternalBatchMessage( - new OffsetsRange { Start = startOffsetBatch, End = endOffsetBatch }, - internalBatchMessages, + new InternalBatchMessages( + batches[batchIndex], + partitionSession, this, - approximatelyBatchBytesSize + Utils.CalculateApproximatelyBytesSize( + bytesSize: approximatelyPartitionBytesSize, + countParts: batchCount, + currentIndex: batchIndex + ), + _deserializer ) ); } @@ -539,84 +521,7 @@ await _channelWriter.WriteAsync( "ReaderSession[{SessionId}]: received PartitionData for unknown(closed?) " + "PartitionSession[{PartitionSessionId}], all messages were skipped!", SessionId, partitionSessionId); - - Interlocked.Add(ref _memoryUsageMaxBytes, approximatelyPartitionBytesSize); - } - } - } - - private static long CalculateApproximatelyBytesSize(long bytesSize, int countParts, int currentIndex) - { - return bytesSize / countParts + currentIndex == countParts - 1 ? bytesSize % countParts : 0; - } - - private class PartitionSession - { - private readonly ILogger _logger; - private readonly ConcurrentQueue<(long EndOffset, TaskCompletionSource TcsCommit)> _waitCommitMessages = new(); - - public PartitionSession( - ILogger logger, - long partitionSessionId, - string topicPath, - long partitionId, - long commitedOffset) - { - _logger = logger; - PartitionSessionId = partitionSessionId; - TopicPath = topicPath; - PartitionId = partitionId; - CommitedOffset = commitedOffset; - PrevEndOffsetMessage = commitedOffset; - } - - // Identifier of partition session. Unique inside one RPC call. - internal long PartitionSessionId { get; } - - // Topic path of partition - internal string TopicPath { get; } - - // Partition identifier - internal long PartitionId { get; } - - // Each offset up to and including (committed_offset - 1) was fully processed. - internal long CommitedOffset { get; set; } - - internal long PrevEndOffsetMessage { get; set; } - - internal void RegisterCommitRequest(CommitSending commitSending) - { - var endOffset = commitSending.OffsetsRange.End; - - if (endOffset <= CommitedOffset) - { - commitSending.TcsCommit.SetResult(); - } - else - { - _waitCommitMessages.Enqueue((endOffset, commitSending.TcsCommit)); - } - } - - internal void HandleCommitedOffset(long commitedOffset) - { - if (CommitedOffset >= commitedOffset) - { - _logger.LogError( - "PartitionSession[{PartitionSessionId}] received CommitOffsetResponse[CommitedOffset={CommitedOffset}] " + - "which is not greater than previous committed offset: {PrevCommitedOffset}", - PartitionSessionId, commitedOffset, CommitedOffset); } - - CommitedOffset = commitedOffset; - - while (_waitCommitMessages.TryPeek(out var waitCommitTcs) && waitCommitTcs.EndOffset <= commitedOffset) - { - _waitCommitMessages.TryDequeue(out _); - waitCommitTcs.TcsCommit.SetResult(); - } - - throw new NotImplementedException(); } } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Utils.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Utils.cs new file mode 100644 index 00000000..f0cb6a47 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Utils.cs @@ -0,0 +1,15 @@ +namespace Ydb.Sdk.Services.Topic.Reader; + +internal static class Utils +{ + internal static long CalculateApproximatelyBytesSize(long bytesSize, int countParts, int currentIndex) + { + return bytesSize / countParts + (currentIndex == countParts - 1 ? bytesSize % countParts : 0); + } + + internal static void SetPartitionClosedException(CommitSending commitSending, long partitionSessionId) + { + commitSending.TcsCommit.TrySetException( + new ReaderException($"PartitionSession[{partitionSessionId}] was closed by server.")); + } +} diff --git a/src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs b/src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs index aecff7be..afc755b3 100644 --- a/src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs +++ b/src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs @@ -14,11 +14,11 @@ public class ReaderIntegrationTests : IClassFixture public ReaderIntegrationTests(DriverFixture driverFixture) { _driver = driverFixture.Driver; - _topicName = "topic_" + Utils.Net; + _topicName = "reader_topic_" + Utils.Net; } [Fact] - public async Task Simple() + public async Task StressTest_WhenReadingThenCommiting_ReturnMessages() { var topicClient = new TopicClient(_driver); var topicSettings = new CreateTopicSettings { Path = _topicName }; @@ -27,13 +27,40 @@ public async Task Simple() using var writer = new WriterBuilder(_driver, _topicName) { ProducerId = "producerId" }.Build(); - using var reader = new ReaderBuilder(_driver) + var reader = new ReaderBuilder(_driver) { ConsumerName = "Consumer", - SubscribeSettings = { new SubscribeSettings(_topicName) } + SubscribeSettings = { new SubscribeSettings(_topicName) }, + MemoryUsageMaxBytes = 200 }.Build(); - await writer.WriteAsync("Hello World!"); - Assert.Equal("Hello World!", (await reader.ReadAsync()).Data); + for (var i = 0; i < 1000; i++) + { + await writer.WriteAsync($"{i}: Hello World!"); + var message = await reader.ReadAsync(); + Assert.Equal($"{i}: Hello World!", message.Data); + await message.CommitAsync(); + } + + reader.Dispose(); + + var readerNext = new ReaderBuilder(_driver) + { + ConsumerName = "Consumer", + SubscribeSettings = { new SubscribeSettings(_topicName) }, + MemoryUsageMaxBytes = 1000 + }.Build(); + + for (var i = 1000; i < 2000; i++) + { + await writer.WriteAsync($"{i}: Hello World!"); + var message = await readerNext.ReadAsync(); + Assert.Equal($"{i}: Hello World!", message.Data); + await message.CommitAsync(); + } + + readerNext.Dispose(); + + await topicClient.DropTopic(new DropTopicSettings { Path = _topicName }); } } diff --git a/src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs b/src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs new file mode 100644 index 00000000..2e8213a8 --- /dev/null +++ b/src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs @@ -0,0 +1,119 @@ +using Google.Protobuf; +using Grpc.Core; +using Moq; +using Ydb.Sdk.Services.Topic.Reader; +using Ydb.Topic; + +namespace Ydb.Sdk.Tests.Topic; + +using ReaderStream = IBidirectionalStream; +using FromClient = StreamReadMessage.Types.FromClient; +using FromServer = StreamReadMessage.Types.FromServer; + +public class ReaderUnitTests +{ + private readonly Mock _mockIDriver = new(); + private readonly Mock _mockStream = new(); + + public ReaderUnitTests() + { + _mockIDriver.Setup(driver => driver.BidirectionalStreamCall( + It.IsAny>(), + It.IsAny()) + ).Returns(_mockStream.Object); + + _mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory); + } + + // [Fact] + public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadThenCommitMessage() + { + var tcs = new TaskCompletionSource(); + + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask); + + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .Returns(new ValueTask(true)) + .Returns(new ValueTask(true)) + .Returns(new ValueTask(true)) + .Returns(new ValueTask(true)) + .Returns(new ValueTask(tcs.Task)); + + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new FromServer + { + Status = StatusIds.Types.StatusCode.Success, + InitResponse = new StreamReadMessage.Types.InitResponse { SessionId = "SessionId" } + }) + .Returns( + new FromServer + { + Status = StatusIds.Types.StatusCode.Success, + StartPartitionSessionRequest = new StreamReadMessage.Types.StartPartitionSessionRequest + { + CommittedOffset = 0, + PartitionOffsets = new OffsetsRange { End = 0, Start = 0 }, + PartitionSession = new StreamReadMessage.Types.PartitionSession + { Path = "/topic", PartitionId = 1, PartitionSessionId = 1 } + } + }) + .Returns( + new FromServer + { + ReadResponse = new StreamReadMessage.Types.ReadResponse + { + BytesSize = 50, PartitionData = + { + new StreamReadMessage.Types.ReadResponse.Types.PartitionData + { + Batches = + { + new StreamReadMessage.Types.ReadResponse.Types.Batch + { + MessageData = + { + new StreamReadMessage.Types.ReadResponse.Types.MessageData + { Data = ByteString.CopyFrom(BitConverter.GetBytes(100)) } + } + } + } + } + } + } + } + ) + .Returns( + new FromServer + { + CommitOffsetResponse = + new StreamReadMessage.Types.CommitOffsetResponse + { + PartitionsCommittedOffsets = + { + new StreamReadMessage.Types.CommitOffsetResponse.Types.PartitionCommittedOffset + { + PartitionSessionId = 1, + CommittedOffset = 50 + } + } + } + } + ); + + + using var reader = new ReaderBuilder(_mockIDriver.Object) + { + ConsumerName = "Consumer Tester", + MemoryUsageMaxBytes = 200, + SubscribeSettings = { new SubscribeSettings("/topic") } + }.Build(); + + await reader.ReadAsync(); + // await message.CommitAsync(); + // Assert.Equal(100, message.Data); + } +}