Skip to content

Commit

Permalink
feat: supported Commit() & ReadAsync() in TopicReader (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov authored Feb 6, 2025
1 parent 943f8a0 commit ab8fb03
Show file tree
Hide file tree
Showing 10 changed files with 525 additions and 373 deletions.
7 changes: 1 addition & 6 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,13 @@ public class ReaderException : Exception
{
public ReaderException(string message) : base(message)
{
Status = new Status(StatusCode.Unspecified);
}

public ReaderException(string message, Status status) : base(message + ": " + status)
{
Status = status;
}

public ReaderException(string message, Driver.TransportException e) : base(message, e)
public ReaderException(string message, Exception inner) : base(message, inner)
{
Status = e.Status;
}

public Status Status { get; }
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/IReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public interface IReader<TValue> : IDisposable
{
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);

public ValueTask<BatchMessage<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default);
public ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default);
}
111 changes: 111 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic.Reader;

internal class InternalBatchMessages<TValue>
{
private readonly StreamReadMessage.Types.ReadResponse.Types.Batch _batch;
private readonly PartitionSession _partitionSession;
private readonly IDeserializer<TValue> _deserializer;
private readonly ReaderSession<TValue> _readerSession;
private readonly long _approximatelyBatchSize;

private int _startMessageDataIndex;

private int OriginalMessageCount => _batch.MessageData.Count;
private bool IsActive => _startMessageDataIndex < OriginalMessageCount && _readerSession.IsActive;

public InternalBatchMessages(
StreamReadMessage.Types.ReadResponse.Types.Batch batch,
PartitionSession partitionsSession,
ReaderSession<TValue> readerSession,
long approximatelyBatchSize,
IDeserializer<TValue> deserializer)
{
_batch = batch;
_partitionSession = partitionsSession;
_readerSession = readerSession;
_deserializer = deserializer;
_approximatelyBatchSize = approximatelyBatchSize;
}

internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> message)
{
if (!IsActive)
{
message = default;
return false;
}

var index = _startMessageDataIndex++;
var approximatelyMessageBytesSize = Utils
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index);
var messageData = _batch.MessageData[index];

TValue value;
try
{
value = _deserializer.Deserialize(messageData.Data.ToByteArray());
}
catch (Exception e)
{
throw new ReaderException("Error when deserializing message data", e);
}

_readerSession.TryReadRequestBytes(approximatelyMessageBytesSize);
var nextCommitedOffset = messageData.Offset + 1;

message = new Message<TValue>(
data: value,
topic: _partitionSession.TopicPath,
partitionId: _partitionSession.PartitionId,
partitionSessionId: _partitionSession.PartitionSessionId,
producerId: _batch.ProducerId,
createdAt: messageData.CreatedAt.ToDateTime(),
metadata: messageData.MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray()))
.ToImmutableArray(),
offsetsRange: new OffsetsRange
{ Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset },
readerSession: _readerSession
);
_partitionSession.PrevEndOffsetMessage = nextCommitedOffset;

return true;
}

internal bool TryPublicBatch([MaybeNullWhen(false)] out BatchMessages<TValue> batchMessages)
{
if (!IsActive)
{
batchMessages = default;
return false;
}

var nextCommitedOffset = _batch.MessageData.Last().Offset + 1;
var offsetsRangeBatch = new OffsetsRange
{ Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset };
_partitionSession.PrevEndOffsetMessage = nextCommitedOffset;

var messages = new List<Message<TValue>>();
while (TryDequeueMessage(out var message))
{
messages.Add(message);
}

batchMessages = new BatchMessages<TValue>(
batch: messages,
readerSession: _readerSession,
offsetsRange: offsetsRangeBatch,
partitionSessionId: _partitionSession.PartitionSessionId
);

return true;
}
}

internal record CommitSending(
OffsetsRange OffsetsRange,
TaskCompletionSource TcsCommit
);
91 changes: 0 additions & 91 deletions src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs

This file was deleted.

62 changes: 22 additions & 40 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ namespace Ydb.Sdk.Services.Topic.Reader;

public class Message<TValue>
{
private readonly long _partitionSessionId;
private readonly OffsetsRange _offsetsRange;
private readonly ReaderSession _readerSession;
private readonly long _approximatelyBytesSize;
private readonly ReaderSession<TValue> _readerSession;

internal Message(
TValue data,
string topic,
long partitionId,
long partitionSessionId,
string producerId,
DateTime createdAt,
ImmutableArray<Metadata> metadata,
OffsetsRange offsetsRange,
ReaderSession readerSession,
long approximatelyBytesSize)
ReaderSession<TValue> readerSession)
{
Data = data;
Topic = topic;
Expand All @@ -27,9 +27,9 @@ internal Message(
CreatedAt = createdAt;
Metadata = metadata;

_partitionSessionId = partitionSessionId;
_offsetsRange = offsetsRange;
_readerSession = readerSession;
_approximatelyBytesSize = approximatelyBytesSize;
}

public TValue Data { get; }
Expand All @@ -45,56 +45,38 @@ internal Message(

public DateTime CreatedAt { get; }

public ImmutableArray<Metadata> Metadata { get; }

internal long Start => _offsetsRange.Start;
internal long End => _offsetsRange.End;
public IReadOnlyCollection<Metadata> Metadata { get; }

public Task CommitAsync()
{
return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId, _approximatelyBytesSize);
return _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId);
}
}

public class BatchMessage<TValue>
public class BatchMessages<TValue>
{
private readonly ReaderSession _readerSession;
private readonly long _approximatelyBatchSize;
private readonly ReaderSession<TValue> _readerSession;
private readonly OffsetsRange _offsetsRange;
private readonly long _partitionSessionId;

public ImmutableArray<Message<TValue>> Batch { get; }
public IReadOnlyCollection<Message<TValue>> Batch { get; }

internal BatchMessage(
ImmutableArray<Message<TValue>> batch,
ReaderSession readerSession,
long approximatelyBatchSize)
internal BatchMessages(
IReadOnlyCollection<Message<TValue>> batch,
ReaderSession<TValue> readerSession,
OffsetsRange offsetsRange,
long partitionSessionId)
{
Batch = batch;
_readerSession = readerSession;
_approximatelyBatchSize = approximatelyBatchSize;
_offsetsRange = offsetsRange;
_partitionSessionId = partitionSessionId;
}

public Task CommitBatchAsync()
{
if (Batch.Length == 0)
{
return Task.CompletedTask;
}

var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End };

return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId, _approximatelyBatchSize);
return Batch.Count == 0
? Task.CompletedTask
: _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId);
}
}

public class TopicPartitionOffset
{
public TopicPartitionOffset(long offset, long partitionId)
{
Offset = offset;
PartitionId = partitionId;
}

public long Offset { get; }

public long PartitionId { get; }
}
Loading

0 comments on commit ab8fb03

Please sign in to comment.