Skip to content

Commit

Permalink
Fix KafkaSource CustomerConsumerTest to use await() instead of Thread…
Browse files Browse the repository at this point in the history
….sleep()

Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Sep 18, 2023
1 parent 6322389 commit 1ada77b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.lastCommitTime = System.currentTimeMillis();
}

KafkaTopicMetrics getTopicMetrics() {
return topicMetrics;
}

public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range<Long> offsetRange) {
long min = offsetRange.getMinimum();
long max = offsetRange.getMaximum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import java.time.Duration;

import static org.awaitility.Awaitility.await;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class KafkaSourceCustomConsumerTest {
Expand Down Expand Up @@ -95,23 +98,35 @@ public class KafkaSourceCustomConsumerTest {
private final int testPartition = 0;
private final int testJsonPartition = 1;
private Counter counter;
@Mock
private Counter posCounter;
@Mock
private Counter negCounter;
private Duration delayTime;
private double posCount;
private double negCount;

@BeforeEach
public void setUp() {
delayTime = Duration.ofMillis(10);
kafkaConsumer = mock(KafkaConsumer.class);
topicMetrics = mock(KafkaTopicMetrics.class);
counter = mock(Counter.class);
posCounter = mock(Counter.class);
negCounter = mock(Counter.class);
topicConfig = mock(TopicConfig.class);
when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(counter);
when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter);
when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter);
when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(posCounter);
when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(negCounter);
when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter);
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topicConfig.getAutoCommit()).thenReturn(false);
when(kafkaConsumer.committed(any(TopicPartition.class))).thenReturn(null);

doAnswer((i)-> {return null;}).when(counter).increment();
doAnswer((i)-> {posCount += 1.0; return null;}).when(posCounter).increment();
doAnswer((i)-> {negCount += 1.0; return null;}).when(negCounter).increment();
doAnswer((i)-> {return posCount;}).when(posCounter).count();
doAnswer((i)-> {return negCount;}).when(negCounter).count();
callbackExecutor = Executors.newFixedThreadPool(2);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000));

Expand Down Expand Up @@ -201,9 +216,10 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted
event.getEventHandle().release(true);
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(10000);
} catch (Exception e){}
await().atMost(delayTime.plusMillis(5000))
.until(() -> {
return consumer.getTopicMetrics().getNumberOfPositiveAcknowledgements().count() == 1.0;
});

consumer.processAcknowledgedOffsets();
offsetsToCommit = consumer.getOffsetsToCommit();
Expand Down Expand Up @@ -245,9 +261,10 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int
event.getEventHandle().release(false);
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(10000);
} catch (Exception e){}
await().atMost(delayTime.plusMillis(5000))
.until(() -> {
return consumer.getTopicMetrics().getNumberOfNegativeAcknowledgements().count() == 1.0;
});

consumer.processAcknowledgedOffsets();
offsetsToCommit = consumer.getOffsetsToCommit();
Expand Down

0 comments on commit 1ada77b

Please sign in to comment.