Skip to content

Commit

Permalink
No checkpoint when the partition is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
LGouellec committed Nov 22, 2024
1 parent 1ecdfaa commit cbdb60f
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 20 deletions.
5 changes: 3 additions & 2 deletions core/Processors/GlobalStreamThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void Initialize()

globalConsumer.Assign(mappedPartitions);
foreach(var tpo in mappedPartitions)
globalConsumer.StoreOffset(tpo);
globalConsumer.Seek(tpo);

lastFlush = DateTime.Now;
}
Expand Down Expand Up @@ -95,7 +95,7 @@ public void Close()
log.LogError(e, "Failed to close global consumer due to the following error:");
}

globalStateMaintainer.FlushState();
globalStateMaintainer.FlushState(true);
globalStateMaintainer.Close();
}
}
Expand Down Expand Up @@ -157,6 +157,7 @@ private void Run()
try
{
stateConsumer.Close();
metricsRegistry.RemoveThreadSensors(threadClientId);
}
catch (Exception e)
{
Expand Down
18 changes: 12 additions & 6 deletions core/Processors/Internal/GlobalStateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void Checkpoint()
{
offsetCheckpointManager.Write(context.Id,
ChangelogOffsets.Where(kv =>
!globalNonPersistentStateStores.Contains(kv.Key.Topic)).ToDictionary());
!globalNonPersistentStateStores.Contains(kv.Key.Topic) && kv.Value != Offset.Beginning).ToDictionary());
}
catch (Exception e)
{
Expand Down Expand Up @@ -225,14 +225,20 @@ private void RestoreState(
{
foreach (var topicPartition in topicPartitions)
{
long offset, checkpoint, highWM;

if (ChangelogOffsets.ContainsKey(topicPartition))
long offset, checkpoint, highWM, checkpointSeek;

if (ChangelogOffsets.ContainsKey(topicPartition))
{
checkpoint = ChangelogOffsets[topicPartition];
checkpointSeek = checkpoint + 1;
}
else
{
checkpoint = Offset.Beginning.Value;

globalConsumer.Assign((new TopicPartitionOffset(topicPartition, new Offset(checkpoint))).ToSingle());
checkpointSeek = checkpoint;
}

globalConsumer.Assign((new TopicPartitionOffset(topicPartition, new Offset(checkpointSeek))).ToSingle());
offset = checkpoint;
var lowWM = offsetWatermarks[topicPartition].Item1;
highWM = offsetWatermarks[topicPartition].Item2;
Expand Down
15 changes: 10 additions & 5 deletions core/Processors/Internal/GlobalStateUpdateTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net.Crosscutting;
using Streamiz.Kafka.Net.State.Internal;
using Streamiz.Kafka.Net.Stream.Internal;

namespace Streamiz.Kafka.Net.Processors.Internal
Expand All @@ -28,11 +29,15 @@ public void Close()
globalStateManager.Close();
}

public void FlushState()
public void FlushState(bool force = false)
{
globalStateManager.Flush();
globalStateManager.UpdateChangelogOffsets(offsets);
globalStateManager.Checkpoint();
var changelogOffsets = globalStateManager.ChangelogOffsets;
if (StateManagerTools.CheckpointNeed(force, changelogOffsets, offsets))
{
globalStateManager.Flush();
globalStateManager.UpdateChangelogOffsets(offsets);
globalStateManager.Checkpoint();
}
}

public IDictionary<TopicPartition, long> Initialize()
Expand Down Expand Up @@ -61,7 +66,7 @@ public void Update(ConsumeResult<byte[], byte[]> record)
processor.Process(record);
log.LogDebug("Completed processing one record [{RecordInfo}]", recordInfo);

offsets.AddOrUpdate(record.TopicPartition, record.Offset + 1);
offsets.AddOrUpdate(record.TopicPartition, record.Offset);
}

private void InitTopology()
Expand Down
2 changes: 1 addition & 1 deletion core/Processors/Internal/IGlobalStateMaintainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ internal interface IGlobalStateMaintainer
{
public void Update(ConsumeResult<byte[], byte[]> record);

public void FlushState();
public void FlushState(bool force = false);

public void Close();

Expand Down
10 changes: 6 additions & 4 deletions launcher/sample-stream/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ private static Topology BuildTopology()
{
var builder = new StreamBuilder();

TimeSpan _windowSizeMs = TimeSpan.FromSeconds(5);
builder.GlobalTable("input", RocksDb.As<string, string>("store"));

/*TimeSpan _windowSizeMs = TimeSpan.FromSeconds(5);
var materializer
= InMemoryWindows
Expand All @@ -55,12 +57,12 @@ var materializer
.GroupByKey()
.WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1)))
.Count(materializer)
/*.Suppress(SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.Zero,
.Suppress(SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.Zero,
StrictBufferConfig.Unbounded())
.WithKeySerdes(new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds)))
*/.ToStream()
.ToStream()
.Map((k,v, r) => new KeyValuePair<string,long>(k.Key, v))
.To<StringSerDes, Int64SerDes>("output2");
.To<StringSerDes, Int64SerDes>("output2");*/

return builder.Build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ public void ShouldFlushState()

globalStateUpdateTask.FlushState();

stateManagerMock.Verify(x => x.Flush(), Times.Never);
}

[Test]
public void ShouldForceFlushState()
{
globalStateUpdateTask.Initialize();

globalStateUpdateTask.FlushState(true);

stateManagerMock.Verify(x => x.Flush(), Times.Once);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void ShouldNotFlushTooSoon()
globalStreamThread.Start();

// this should be true as the thread should wait 100ms to flush
globalStateMaintainerMock.Verify(x => x.FlushState(), Times.Never);
globalStateMaintainerMock.Verify(x => x.FlushState(false), Times.Never);
globalStreamThread.Dispose();
}

Expand All @@ -236,7 +236,7 @@ public void ShouldFlush()

Thread.Sleep(50);
// we are waiting longer than CommitIntervalMs so thread should already flush at least once
globalStateMaintainerMock.Verify(x => x.FlushState());
globalStateMaintainerMock.Verify(x => x.FlushState(false));
globalStreamThread.Dispose();
}
}
Expand Down

0 comments on commit cbdb60f

Please sign in to comment.