Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
Charley Skills committed Apr 13, 2024
1 parent 7b10cbc commit 8692e62
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ namespace Streamiz.Kafka.Net.State.Logging
internal class ChangeLoggingTimestampedWindowBytesStore :
ChangeLoggingWindowBytesStore
{
public ChangeLoggingTimestampedWindowBytesStore(IWindowStore<Bytes, byte[]> wrapped)
: base(wrapped)
{
}
public ChangeLoggingTimestampedWindowBytesStore(IWindowStore<Bytes, byte[]> wrapped, bool retainDuplicates)
: base(wrapped, retainDuplicates)
{ }

protected override void Publish(Bytes key, byte[] valueAndTs)
{
Expand Down
3 changes: 2 additions & 1 deletion core/State/Logging/ChangeLoggingWindowBytesStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ internal class ChangeLoggingWindowBytesStore :
private int seqnum = 0;
private readonly bool retainDuplicates;

public ChangeLoggingWindowBytesStore(IWindowStore<Bytes, byte[]> wrapped)
public ChangeLoggingWindowBytesStore(IWindowStore<Bytes, byte[]> wrapped, bool retainDuplicates)
: base(wrapped)
{
this.retainDuplicates = retainDuplicates;
}

protected virtual void Publish(Bytes key, byte[] value)
Expand Down
2 changes: 1 addition & 1 deletion core/State/TimestampedWindowStoreBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private IWindowStore<Bytes, byte[]> WrapLogging(IWindowStore<Bytes, byte[]> inne
if (!LoggingEnabled)
return inner;

return new ChangeLoggingTimestampedWindowBytesStore(inner);
return new ChangeLoggingTimestampedWindowBytesStore(inner, supplier.RetainDuplicates);
}
}
}
2 changes: 1 addition & 1 deletion core/State/WindowStoreBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private IWindowStore<Bytes, byte[]> WrapLogging(IWindowStore<Bytes, byte[]> inne
if (!LoggingEnabled)
return inner;

return new ChangeLoggingWindowBytesStore(inner);
return new ChangeLoggingWindowBytesStore(inner, supplier.RetainDuplicates);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void Begin()
context.UseRecordCollector(recordCollector);

var inmemorystore = new InMemoryWindowStore("test-store", TimeSpan.FromDays(1), TimeSpan.FromSeconds(1).Milliseconds, false);
store = new ChangeLoggingTimestampedWindowBytesStore(inmemorystore);
store = new ChangeLoggingTimestampedWindowBytesStore(inmemorystore, false);
store.Init(context, store);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void Begin()
context.UseRecordCollector(recordCollector);

var inmemorystore = new InMemoryWindowStore("test-store", TimeSpan.FromDays(1), TimeSpan.FromSeconds(1).Milliseconds, false);
store = new ChangeLoggingWindowBytesStore(inmemorystore);
store = new ChangeLoggingWindowBytesStore(inmemorystore, false);
store.Init(context, store);
}

Expand Down

0 comments on commit 8692e62

Please sign in to comment.