Skip to content

Commit

Permalink
Add logs to debug double consumption during checkpoint recovery from …
Browse files Browse the repository at this point in the history
…native kafka
  • Loading branch information
jeffxiang committed Jan 13, 2025
1 parent 3b00c3d commit 7d20d6c
Showing 1 changed file with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ public byte[] serialize(PscTopicUriPartitionSplit split) throws IOException {
out.writeLong(split.getStartingOffset());
out.writeLong(split.getStoppingOffset().orElse(PscTopicUriPartitionSplit.NO_STOPPING_OFFSET));
out.flush();
LOG.info("Serializing split with version: " + getVersion() + " for topicUri-partition: " + split.getTopicUri() + "-" + split.getPartition());
return baos.toByteArray();
}
}

@Override
public PscTopicUriPartitionSplit deserialize(int version, byte[] serialized) throws IOException {
LOG.info("Deserializing split with version: " + version);
Thread.dumpStack();
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topicUri = in.readUTF();
Expand All @@ -90,6 +92,7 @@ public PscTopicUriPartitionSplit deserialize(int version, byte[] serialized) thr
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
LOG.info("Deserialized split for topicUri-partition: " + topicUri + "-" + partition + " for offset=" + offset + " and stopping offset=" + stoppingOffset);
return new PscTopicUriPartitionSplit(
new TopicUriPartition(topicUri, partition), offset, stoppingOffset);
}
Expand Down

0 comments on commit 7d20d6c

Please sign in to comment.