Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic record timestamp extractor #329

Merged
merged 2 commits into from
Jun 20, 2024

Conversation

ppilev
Copy link
Contributor

@ppilev ppilev commented Jun 14, 2024

Provides control over timestamp value of records published to Kafka output topic.

Some context below:
Q: How this functionality is different than ITimestampExtractor applied to a source stream or IKStream<K, V>.WithRecordTimestamp applied to some existing stream?

A: In simple scenarios either method can be used to provide timestamps for the records sent to the output topic.
For example ITimestampExtractor provides the initial value for RecordContext.Timestamp, whereas IKStream<K, V>.WithRecordTimestamp can override that value later.
However in some scenarios you might need more grained control over what timestamp is applied to the output topic records and change that timestamp without affecting the RecordContext.Timestamp of underlying stream.

Example with pseudo code which WON'T WORK:

var joinedStream = stream1.WithRecordTimestamp(m1 => m1.Timestamp) LEFT JOIN stream2.WithRecordTimestamp(m2 => m2.Timestamp);
joinedStream.WithRecordTimestamp(utcNow).To("output-topic")

This basically will break your join because applying joinedStream.WithRecordTimestamp(utcNow) will override the timestamp in RecordContext.Timestamp and value stored by KStreamJoinWindowProcessor into the window store will be utcNow as all topology processors are executed first:

        public override void Process(K key, V value)
        {
            if(key != null)
            {
                Forward(key, value);
                window.Put(key, value, Context.Timestamp);
            }
        }

The solution I came up with is to bring the desired timestamp value directly to SinkProcessor without touching the value in RecordContext.Timestamp at all.

Example with pseudo code which WORKS with changes proposed by this PR:

var joinedStream = stream1.WithRecordTimestamp(m1 => m1.Timestamp) LEFT JOIN stream2.WithRecordTimestamp(m2 => m2.Timestamp);
joinedStream.To("output-topic", extractor => utcNow)

@ppilev
Copy link
Contributor Author

ppilev commented Jun 14, 2024

hey @LGouellec

I know the spare time is a luxury but could you please review it when you've got some.
what's the chance to have that change being available with the upcoming 1.6.0 release?

regards,
Plamen

@LGouellec
Copy link
Owner

Hi @ppilev ,

I will take a look before end of next week ! Good chance to merge before 1.6

@LGouellec LGouellec merged commit 54608e4 into LGouellec:develop Jun 20, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants