Skip to content

Commit

Permalink
Add test for DataStreamPartitionCheckpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Mar 26, 2024
1 parent da70f17 commit 10181f7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus;
import org.opensearch.dataprepper.plugins.mongo.model.StreamLoadStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.opensearch.dataprepper.plugins.mongo.stream;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.StreamProgressState;

import java.time.Duration;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.mongo.stream.DataStreamPartitionCheckpoint.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE;

@ExtendWith(MockitoExtension.class)
public class DataStreamPartitionCheckpointTest {
@Mock
private EnhancedSourceCoordinator enhancedSourceCoordinator;

@Mock
private StreamPartition streamPartition;

@Mock
private StreamProgressState streamProgressState;

@InjectMocks
private DataStreamPartitionCheckpoint dataStreamPartitionCheckpoint;

@Test
public void checkpoint() {
final int recordNumber = new Random().nextInt();
final String checkpointToken = UUID.randomUUID().toString();
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
dataStreamPartitionCheckpoint.checkpoint(checkpointToken, recordNumber);
verify(enhancedSourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
verify(streamProgressState).setResumeToken(checkpointToken);
verify(streamProgressState).setLoadedRecords(recordNumber);
verify(streamProgressState).setLastUpdateTimestamp(anyLong());
}

@Test
public void updateStreamPartitionForAcknowledgmentWait() {
final int minutes = new Random().nextInt();
final Duration duration = Duration.ofMinutes(minutes);
dataStreamPartitionCheckpoint.updateStreamPartitionForAcknowledgmentWait(duration);
verify(enhancedSourceCoordinator).saveProgressStateForPartition(streamPartition, duration);
}
}

0 comments on commit 10181f7

Please sign in to comment.