Skip to content

Commit

Permalink
Rebased to latest
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Sep 18, 2023
1 parent 6286388 commit 36ce54a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis());
}

KafkaTopicMetrics getTopicMetrics() {
return topicMetrics;
}

private long getCurrentTimeNanos() {
Instant now = Instant.now();
return now.getEpochSecond()*1000000000+now.getNano();
Expand Down Expand Up @@ -501,4 +505,4 @@ final String getTopicPartitionOffset(final Map<TopicPartition, Long> offsetMap,
final Long offset = offsetMap.get(topicPartition);
return Objects.isNull(offset) ? "-" : offset.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.awaitility.Awaitility.await;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -97,24 +99,42 @@ public class KafkaSourceCustomConsumerTest {
private final int testPartition = 0;
private final int testJsonPartition = 1;
private Counter counter;
@Mock
private Counter posCounter;
@Mock
private Counter negCounter;
private Duration delayTime;
private double posCount;
private double negCount;

@BeforeEach
public void setUp() {
delayTime = Duration.ofMillis(10);
kafkaConsumer = mock(KafkaConsumer.class);
topicMetrics = mock(KafkaTopicMetrics.class);
counter = mock(Counter.class);
posCounter = mock(Counter.class);
negCounter = mock(Counter.class);
topicConfig = mock(TopicConfig.class);
when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(counter);
when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter);
when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter);
when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(posCounter);
when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(negCounter);
when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter);
when(topicMetrics.getNumberOfDeserializationErrors()).thenReturn(counter);
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topicConfig.getAutoCommit()).thenReturn(false);
when(kafkaConsumer.committed(any(TopicPartition.class))).thenReturn(null);

doAnswer((i)-> {return null;}).when(counter).increment();
doAnswer((i)-> {
posCount += 1.0;
return null;
}).when(posCounter).increment();
doAnswer((i)-> {
negCount += 1.0;
return null;
}).when(negCounter).increment();
doAnswer((i)-> {return posCount;}).when(posCounter).count();
doAnswer((i)-> {return negCount;}).when(negCounter).count();
callbackExecutor = Executors.newFixedThreadPool(2);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000));

Expand Down Expand Up @@ -207,9 +227,10 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted
event.getEventHandle().release(true);
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(100);
} catch (Exception e){}
await().atMost(delayTime.plusMillis(5000))
.until(() -> {
return consumer.getTopicMetrics().getNumberOfPositiveAcknowledgements().count() == 1.0;
});

consumer.processAcknowledgedOffsets();
offsetsToCommit = consumer.getOffsetsToCommit();
Expand Down Expand Up @@ -254,9 +275,10 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int
event.getEventHandle().release(false);
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(100);
} catch (Exception e){}
await().atMost(delayTime.plusMillis(5000))
.until(() -> {
return consumer.getTopicMetrics().getNumberOfNegativeAcknowledgements().count() == 1.0;
});

consumer.processAcknowledgedOffsets();
offsetsToCommit = consumer.getOffsetsToCommit();
Expand Down

0 comments on commit 36ce54a

Please sign in to comment.