Skip to content

Commit

Permalink
dekaf: Properly replace acks with Kafka control messages
Browse files Browse the repository at this point in the history
These are messages that exist in the topics (i.e take up a particular offset location), but don't get emitted to consumers. There are two options for emitting control messages to be ignored by the consumer library: real `COMMIT` messages, and messages with an unsupported (!= 0) version. I went with the latter to avoid any potential future changes that causes the client library to do something other than skip a `COMMIT` message.
  • Loading branch information
jshearer committed Sep 17, 2024
1 parent 1a309e7 commit 6a65c17
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{Collection, Partition};
use anyhow::bail;
use bytes::BufMut;
use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
Expand Down Expand Up @@ -171,7 +172,27 @@ impl Read {

// Encode the key.
let key = if is_control {
None
// From https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
// Also from https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
// Control messages will always have a non-null key, which is used to
// indicate the type of control message type with the following schema:
// ControlMessageKey => Version ControlMessageType
// Version => int16
// ControlMessageType => int16
// Skip control messages with version != 0:
// if (ctrl_data.Version != 0) {
// rd_kafka_buf_skip_to(rkbuf, message_end);
// return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */
// }
// https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_msgset_reader.c#L777-L824

// Control Message keys are always 4 bytes:
// Version: Any value != 0: i16
buf.put_i16(9999);
// ControlMessageType: unused: i16
buf.put_i16(9999);
records_bytes += 4;
Some(buf.split().freeze())
} else {
tmp.push(0);
tmp.extend(self.key_schema_id.to_be_bytes());
Expand Down Expand Up @@ -214,22 +235,20 @@ impl Read {
// as offset for efficient record batch packing.
let kafka_offset = next_offset - 1;

if !is_control {
records.push(Record {
control: false,
headers: Default::default(),
key,
offset: kafka_offset,
partition_leader_epoch: 1,
producer_epoch: 1,
producer_id: producer.as_i64(),
sequence: kafka_offset as i32,
timestamp: unix_seconds as i64 * 1000 + unix_nanos as i64 / 1_000_000, // Map into millis.
timestamp_type: TimestampType::LogAppend,
transactional: false,
value,
});
}
records.push(Record {
control: is_control,
headers: Default::default(),
key,
offset: kafka_offset,
partition_leader_epoch: 1,
producer_epoch: 1,
producer_id: producer.as_i64(),
sequence: kafka_offset as i32,
timestamp: unix_seconds as i64 * 1000 + unix_nanos as i64 / 1_000_000, // Map into millis.
timestamp_type: TimestampType::LogAppend,
transactional: false,
value,
});
}

let opts = RecordEncodeOptions {
Expand Down

0 comments on commit 6a65c17

Please sign in to comment.