-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix consumer synchronization. Fix consumer to use user-specified groupId #3100
Conversation
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good. I do have some comments, but nothing to block it.
@@ -176,12 +170,27 @@ public <T> void consumeRecords() throws Exception { | |||
} | |||
} | |||
} catch (AuthenticationException e) { | |||
LOG.warn("Authentication Error while doing poll(). Will retry after 10 seconds", e); | |||
LOG.warn("Access Denied while doing poll(). Will retry after 10 seconds", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Apache documentation indicates that this is an authentication error and not necessarily related to access controls.
consumeRecords(); | ||
commitOffsets(); | ||
} catch (Exception exp) { | ||
LOG.error("Error while reading the records from the topic...", exp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should avoid ellipsis in our logs. It's not a big deal, but it seems either indicate that we trailed of or have more to say (not the exception).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove ellipsis in the next PR.
@@ -95,6 +99,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, | |||
this.acknowledgementSetManager = acknowledgementSetManager; | |||
this.pluginMetrics = pluginMetrics; | |||
this.partitionCommitTrackerMap = new HashMap<>(); | |||
this.partitionsToReset = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use a concurrent set to avoid synchronization on this object.
this.partitionsToReset = Collections.synchronizedSet(new HashSet<>());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better performance wise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt it, but it would ensure that somebody doesn't forget to synchronize calls as the file is maintained.
} catch (Exception e) { | ||
LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e); | ||
synchronized(partitionsToReset) { | ||
partitionsToReset.add(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a nice change. Did you do any performance testing or find any particular pitfalls with the original approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't done any performance testing. But definitely having a lock every time consumer is accessed is not good.
Thread.sleep(10000); | ||
} catch (RecordDeserializationException e) { | ||
LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialization -> Deserialization.
Also increment a metric when we get to metrics.
} catch (RecordDeserializationException e) { | ||
LOG.warn("Serialization error - topic {} partition {} offset {}, seeking past the error record", | ||
e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); | ||
consumer.seek(e.topicPartition(), e.offset()+1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really required to explicitly seek past this record? can we not just log exception, count a metric and commit offset as usual?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the documentation, we have to seek past the offset to continue reading. We cannot commit the offset unless we get acknowledgement that previously read records are flushed to the sink, right?
@@ -214,8 +221,8 @@ public void run() { | |||
try { | |||
consumer.subscribe(Arrays.asList(topicName)); | |||
while (!shutdownInProgress.get()) { | |||
resetOrCommitOffsets(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be better to have separate functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it. Resetting is just 3 or 4 line code and didn't feel like making it a separate function. Especially it may be null operation most of the time.
@@ -214,17 +212,15 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth | |||
retryable = false; | |||
try { | |||
result = kafkaClient.getBootstrapBrokers(request); | |||
} catch (InternalServerErrorException | ConflictException | ForbiddenException | UnauthorizedException | StsException e) { | |||
} catch (KafkaException | StsException e) { | |||
LOG.debug("Failed to get bootstrap server information from MSK. Retrying...", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be LOG.info? also make it explicit like will retry with exponential backoff or after so many seconds. Do we need to log entire backtrace? just e.message() may be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sure every time we included just the message, entire stack trace was later needed. Hope fully, these are not common scenarios.
Description
This PR contains the following fixes
-- Fixes consumer synchronization issue by avoid synchronization
-- Fixes LOG messages to use {}
Issues Resolved
[List any issues this PR will resolve]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.