Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to set timestamp for message at publish time #60

Open
SrikarSaggurthi opened this issue Jul 30, 2020 · 1 comment
Open

Allow to set timestamp for message at publish time #60

SrikarSaggurthi opened this issue Jul 30, 2020 · 1 comment

Comments

@SrikarSaggurthi
Copy link

Is your feature request related to a problem? Please describe.
It would be helpful if we can set timestamp of the message, it helps to query offsets by time possible resulting in simplified recovery logic.

Describe the solution you'd like
Allow for .kfk.pub to take timestamp as a parameter which can underneath call librdkafka's rd_kafka_producev() API.

Additional resource
confluentinc/librdkafka#1016

@mshimizu-kx
Copy link
Contributor

mshimizu-kx commented Mar 18, 2021

H Srikar,

I added the functionality for .kafka.publishWithHeaders . The example code shows how to specify the timestamp:

  .kafka.publish[topic;.kafka.PARTITION_UA; "Hello from producer";""];
  .kafka.publishWithHeaders[producer; .z.p; topic; .kafka.PARTITION_UA; "locusts"; ""; `header1`header2!("firmament"; "divided")];

You can use .z.p but Kafka only accepts milliseconds precision, so actually it is not different from "wallclock time (UTC)" mentioned in the reference whch is enabled by (api.version.request; true).

kfk_cfg:(!) . flip(
  (`metadata.broker.list;`localhost:9092);
  (`statistics.interval.ms;`10000);
  (`queue.buffering.max.ms;`1);
  (`fetch.wait.max.ms;`10);
  (`api.version.request; `true)
  );
...
omit
...
q)-10#data2
mtype          topic client partition offset msgtime                       data                       key      headers                                                  rcvtime     
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
_PARTITION_EOF test2 0      0         108                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:06.417
               test2 0      0         108    2021.03.18D17:20:10.391000000 "Hello from producer"      `byte$() (`symbol$())!""                                          17:20:10.394
               test2 0      0         109    2021.03.18D17:20:10.391000000 "locusts"                  `byte$() `header1`header2!(0x6669726d616d656e74;0x64697669646564) 17:20:10.394
_PARTITION_EOF test2 0      0         110                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:10.409
               test2 0      0         110    2021.03.18D17:20:14.390000000 "Hello from producer"      `byte$() (`symbol$())!""                                          17:20:14.394
               test2 0      0         111    2021.03.18D17:20:14.390000000 "locusts"                  `byte$() `header1`header2!(0x6669726d616d656e74;0x64697669646564) 17:20:14.394
_PARTITION_EOF test2 0      0         112                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:14.406
               test2 0      0         112    2021.03.18D17:20:18.389000000 "Hello from producer"      `byte$() (`symbol$())!""                                          17:20:18.395
               test2 0      0         113    2021.03.18D17:20:18.390000000 "locusts"                  `byte$() `header1`header2!(0x6669726d616d656e74;0x64697669646564) 17:20:18.395
_PARTITION_EOF test2 0      0         114                                  "Broker: No more messages" `byte$() (`symbol$())!""                                          17:20:18.408

From this result, we don't think it is worth introducing this additional parameter.

Thanks,
Masato

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants