Skip to content

Commit

Permalink
Fix Negative acknowledgement handling and other minor issues (opensea…
Browse files Browse the repository at this point in the history
…rch-project#3082)

* Fix Negative acknowledgement handling and other minor issues

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed check style errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Cleanup of unused files and config

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 1, 2023
1 parent f236af9 commit 3ab7831
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void setup() {
when(jsonTopic.getName()).thenReturn(testTopic);
when(jsonTopic.getGroupId()).thenReturn(testGroup);
when(jsonTopic.getWorkers()).thenReturn(1);
when(jsonTopic.getSessionTimeOut()).thenReturn(15000);
when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(jsonTopic.getAutoCommit()).thenReturn(false);
when(jsonTopic.getSerdeFormat()).thenReturn(MessageFormat.JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void setup() {
when(plainTextTopic.getName()).thenReturn(testTopic);
when(plainTextTopic.getGroupId()).thenReturn(testGroup);
when(plainTextTopic.getWorkers()).thenReturn(1);
when(plainTextTopic.getSessionTimeOut()).thenReturn(15000);
when(plainTextTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(plainTextTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(plainTextTopic.getAutoCommit()).thenReturn(false);
when(plainTextTopic.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,14 @@ public void setup() {
when(avroTopic.getAutoCommit()).thenReturn(false);
when(avroTopic.getAutoOffsetReset()).thenReturn("earliest");
when(avroTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(avroTopic.getSessionTimeOut()).thenReturn(15000);
when(avroTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(5));
when(avroTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(jsonTopic.getName()).thenReturn(testTopic);
when(jsonTopic.getGroupId()).thenReturn(testGroup);
when(jsonTopic.getWorkers()).thenReturn(1);
when(jsonTopic.getAutoCommit()).thenReturn(false);
when(jsonTopic.getAutoOffsetReset()).thenReturn("earliest");
when(jsonTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(jsonTopic.getSessionTimeOut()).thenReturn(15000);
when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
testRegistryName = System.getProperty("tests.kafka.glue_registry_name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ public class PlainTextAuthConfig {
@JsonProperty("password")
private String password;

@JsonProperty("security_protocol")
private String securityProtocol;

public String getSecurityProtocol() {
return securityProtocol;
}

public String getUsername() {
return username;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
*/
public class TopicConfig {
private static final String AUTO_COMMIT = "false";
private static final Duration AUTOCOMMIT_INTERVAL = Duration.ofSeconds(5);
private static final Integer SESSION_TIMEOUT = 45000;
private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
private static final String AUTO_OFFSET_RESET = "earliest";
static final String DEFAULT_AUTO_OFFSET_RESET = "latest";
static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5);
private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5);
Expand All @@ -33,8 +33,8 @@ public class TopicConfig {
private static final Duration RETRY_BACKOFF = Duration.ofSeconds(100);
private static final Duration MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
private static final Integer CONSUMER_MAX_POLL_RECORDS = 500;
private static final Integer NUM_OF_WORKERS = 5;
private static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(3);
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);

@JsonProperty("name")
@NotNull
Expand All @@ -49,7 +49,7 @@ public class TopicConfig {
@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
private Integer workers = NUM_OF_WORKERS;
private Integer workers = DEFAULT_NUM_OF_WORKERS;

@JsonProperty("max_retry_attempts")
@Valid
Expand All @@ -67,18 +67,18 @@ public class TopicConfig {
@JsonProperty("auto_commit")
private Boolean autoCommit = false;

@JsonProperty("auto_commit_interval")
@JsonProperty("commit_interval")
@Valid
@Size(min = 1)
private Duration autoCommitInterval = AUTOCOMMIT_INTERVAL;
private Duration commitInterval = DEFAULT_COMMIT_INTERVAL;

@JsonProperty("session_timeout")
@Valid
@Size(min = 1)
private Integer sessionTimeOut = SESSION_TIMEOUT;
private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT;

@JsonProperty("auto_offset_reset")
private String autoOffsetReset = AUTO_OFFSET_RESET;
private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET;

@JsonProperty("group_name")
@Valid
Expand Down Expand Up @@ -148,15 +148,15 @@ public Boolean getAutoCommit() {
return autoCommit;
}

public Duration getAutoCommitInterval() {
return autoCommitInterval;
public Duration getCommitInterval() {
return commitInterval;
}

public void setAutoCommitInterval(Duration autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
public void setCommitInterval(Duration commitInterval) {
this.commitInterval = commitInterval;
}

public Integer getSessionTimeOut() {
public Duration getSessionTimeOut() {
return sessionTimeOut;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAn
if (Objects.isNull(offsetAndMetadata)) {
return;
}
synchronized (this) {
synchronized (offsetsToCommit) {
offsetsToCommit.put(partition, offsetAndMetadata);
}
}
Expand All @@ -118,18 +118,34 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo
if (result == true) {
positiveAcknowledgementSetCounter.increment();
offsets.forEach((partition, offsetRange) -> {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;

partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = null;
synchronized(consumer) {
committedOffsetAndMetadata = consumer.committed(partition);
}
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon positive acknowledgement "+partition, e);
}
OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
updateOffsetsToCommit(partition, offsetAndMetadata);
});
} else {
positiveAcknowledgementSetCounter.increment();
negativeAcknowledgementSetCounter.increment();
offsets.forEach((partition, offsetRange) -> {
try {
synchronized(consumer) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
consumer.seek(partition, committedOffsetAndMetadata);
}
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e);
}
});
}
}, acknowledgementsTimeout);
return acknowledgementSet;
Expand All @@ -141,9 +157,11 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Lo

public <T> void consumeRecords() throws Exception {
try {
ConsumerRecords<String, T> records =
consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
if (!records.isEmpty() && records.count() > 0) {
ConsumerRecords<String, T> records = null;
synchronized(consumer) {
records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2);
}
if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) {
Map<TopicPartition, Range<Long>> offsets = new HashMap<>();
AcknowledgementSet acknowledgementSet = null;
if (acknowledgementsEnabled) {
Expand All @@ -168,15 +186,17 @@ private void commitOffsets() {
return;
}
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < COMMIT_OFFSET_INTERVAL_MS) {
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) {
return;
}
synchronized (this) {
synchronized (offsetsToCommit) {
if (offsetsToCommit.isEmpty()) {
return;
}
try {
consumer.commitSync();
synchronized(consumer) {
consumer.commitSync();
}
offsetsToCommit.clear();
lastCommitTime = currentTimeMillis;
} catch (CommitFailedException e) {
Expand Down Expand Up @@ -286,8 +306,10 @@ public void shutdownConsumer(){
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
Long committedOffset = consumer.committed(topicPartition).offset();
consumer.seek(topicPartition, committedOffset);
synchronized(consumer) {
Long committedOffset = consumer.committed(topicPartition).offset();
consumer.seek(topicPartition, committedOffset);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
@Override
public void start(Buffer<Record<Event>> buffer) {
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = getGroupId(topic.getName());
consumerGroupID = topic.getGroupId();
Properties consumerProperties = getConsumerProperties(topic);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
Expand Down Expand Up @@ -175,10 +175,6 @@ public void stop() {
LOG.info("Consumer shutdown successfully...");
}

private String getGroupId(String name) {
return pipelineName + "::" + name;
}

private long calculateLongestThreadWaitingTime() {
List<TopicConfig> topicsList = sourceConfig.getTopics();
return topicsList.stream().
Expand Down Expand Up @@ -368,13 +364,13 @@ private void setConsumerTopicProperties(Properties properties, TopicConfig topic
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
topicConfig.getAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
topicConfig.getAutoCommitInterval().toSecondsPart());
((Long)topicConfig.getCommitInterval().toMillis()).intValue());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, topicConfig.getSessionTimeOut());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, topicConfig.getHeartBeatInterval().toSecondsPart());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, topicConfig.getFetchMaxBytes().intValue());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
}
Expand Down
Loading

0 comments on commit 3ab7831

Please sign in to comment.