Skip to content

Commit

Permalink
Misc improvemnts to opensearch source and source coordination
Browse files Browse the repository at this point in the history
  • Loading branch information
graytaylor0 committed Sep 12, 2023
1 parent a7b1d1b commit f3032a2
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,17 @@ public Optional<SourcePartitionStoreItem> tryAcquireAvailablePartition(final Str
return acquiredAssignedItem;
}

final Optional<SourcePartitionStoreItem> acquiredClosedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
final Optional<SourcePartitionStoreItem> acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);

if (acquiredClosedItem.isPresent()) {
return acquiredClosedItem;
if (acquiredUnassignedItem.isPresent()) {
return acquiredUnassignedItem;
}

return dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition(
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
Expand Down Expand Up @@ -278,11 +283,6 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,6 @@ public void queuePartition(final InMemorySourcePartitionStoreItem inMemorySource
}

public Optional<SourcePartitionStoreItem> getNextItem() {
final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek();

if (Objects.nonNull(nextClosedPartitionItem)) {
if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) {
closedPartitions.remove();
return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey));
}
}

final QueuedPartitionsItem nextUnassignedPartitionItem = unassignedPartitions.peek();

if (Objects.nonNull(nextUnassignedPartitionItem)) {
Expand All @@ -103,6 +94,15 @@ public Optional<SourcePartitionStoreItem> getNextItem() {
}
}

final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek();

if (Objects.nonNull(nextClosedPartitionItem)) {
if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) {
closedPartitions.remove();
return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey));
}
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ void queue_closed_then_unassigned_partitions_followed_by_get_returns_expected_pa

final Optional<SourcePartitionStoreItem> acquiredItem = objectUnderTest.getNextItem();
assertThat(acquiredItem.isPresent(), equalTo(true));
assertThat(acquiredItem.get(), equalTo(item));
assertThat(acquiredItem.get(), equalTo(thirdItem));

final Optional<SourcePartitionStoreItem> secondAcquiredItem = objectUnderTest.getNextItem();
assertThat(secondAcquiredItem.isPresent(), equalTo(true));
assertThat(secondAcquiredItem.get(), equalTo(thirdItem));
assertThat(secondAcquiredItem.get(), equalTo(item));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@

public class SchedulingParameterConfiguration {

@JsonProperty("rate")
private Duration rate = Duration.ofHours(8);
@JsonProperty("interval")
private Duration interval = Duration.ofHours(8);

@Min(1)
@JsonProperty("job_count")
private int jobCount = 1;
@JsonProperty("count")
private int count = 1;

@JsonProperty("start_time")
private String startTime = Instant.now().toString();

@JsonIgnore
private Instant startTimeInstant;

public Duration getRate() {
return rate;
public Duration getInterval() {
return interval;
}

public int getJobCount() {
return jobCount;
public int getCount() {
return count;
}

public Instant getStartTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ static Pair<AcknowledgementSet, CompletableFuture<Boolean>> createAcknowledgment
if (result == true) {
sourceCoordinator.closePartition(
indexPartition.getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getCount());
}
completableFuture.complete(result);
}, Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS));
Expand All @@ -69,8 +69,8 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc
} else {
sourceCoordinator.closePartition(
indexPartition.getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ public class SchedulingParameterConfigurationTest {
@Test
void default_scheduling_configuration() {
final SchedulingParameterConfiguration schedulingParameterConfiguration = new SchedulingParameterConfiguration();
assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(1));
assertThat(schedulingParameterConfiguration.getCount(), equalTo(1));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true));
assertThat(schedulingParameterConfiguration.getStartTime().isBefore(Instant.now()), equalTo(true));
assertThat(schedulingParameterConfiguration.getRate(), equalTo(Duration.ofHours(8)));
assertThat(schedulingParameterConfiguration.getInterval(), equalTo(Duration.ofHours(8)));
}

@Test
Expand All @@ -37,7 +37,7 @@ void non_default_scheduling_configuration() throws JsonProcessingException {

final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class);

assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.getCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true));
assertThat(schedulingParameterConfiguration.getStartTime(), equalTo(Instant.parse("2007-12-03T10:15:30.00Z")));
}
Expand All @@ -50,7 +50,7 @@ void invalid_start_time_configuration_test() throws JsonProcessingException {

final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class);

assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.getCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down Expand Up @@ -265,8 +265,8 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down Expand Up @@ -268,8 +268,8 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down Expand Up @@ -354,8 +354,8 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down Expand Up @@ -262,8 +262,8 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down

0 comments on commit f3032a2

Please sign in to comment.