Skip to content

Commit

Permalink
Fixed failing test case
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 7, 2023
1 parent 7190bee commit 6e91aac
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -23,6 +22,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -77,7 +77,7 @@ public class KafkaSourceCustomConsumerTest {
private TopicConfig topicConfig;

@Mock
private PluginMetrics pluginMetrics;
private KafkaTopicMetrics topicMetrics;

private KafkaSourceCustomConsumer consumer;

Expand All @@ -100,15 +100,14 @@ public class KafkaSourceCustomConsumerTest {
@BeforeEach
public void setUp() {
kafkaConsumer = mock(KafkaConsumer.class);
pluginMetrics = mock(PluginMetrics.class);
topicMetrics = mock(KafkaTopicMetrics.class);
counter = mock(Counter.class);
topicConfig = mock(TopicConfig.class);
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topicConfig.getAutoCommit()).thenReturn(false);
when(kafkaConsumer.committed(any(TopicPartition.class))).thenReturn(null);

when(pluginMetrics.counter(anyString())).thenReturn(counter);
doAnswer((i)-> {return null;}).when(counter).increment();
callbackExecutor = Executors.newFixedThreadPool(2);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000));
Expand All @@ -122,7 +121,7 @@ public void setUp() {
public KafkaSourceCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) {
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(acknowledgementsEnabled);
when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT);
return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, pluginMetrics);
return new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, acknowledgementSetManager, topicMetrics);
}

private BlockingBuffer<Record<Event>> getBuffer() {
Expand Down

0 comments on commit 6e91aac

Please sign in to comment.