Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename os source rate/job_count to interval/count, acquire UNASSIGNED partitions before CLOSED partitions #3327

Merged
merged 2 commits into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,17 @@ public Optional<SourcePartitionStoreItem> tryAcquireAvailablePartition(final Str
return acquiredAssignedItem;
}

final Optional<SourcePartitionStoreItem> acquiredClosedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
final Optional<SourcePartitionStoreItem> acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);

if (acquiredClosedItem.isPresent()) {
return acquiredClosedItem;
if (acquiredUnassignedItem.isPresent()) {
return acquiredUnassignedItem;
}

return dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition(
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
Expand Down Expand Up @@ -278,11 +283,6 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,6 @@ public void queuePartition(final InMemorySourcePartitionStoreItem inMemorySource
}

public Optional<SourcePartitionStoreItem> getNextItem() {
final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek();

if (Objects.nonNull(nextClosedPartitionItem)) {
if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) {
closedPartitions.remove();
return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey));
}
}

final QueuedPartitionsItem nextUnassignedPartitionItem = unassignedPartitions.peek();

if (Objects.nonNull(nextUnassignedPartitionItem)) {
Expand All @@ -103,6 +94,15 @@ public Optional<SourcePartitionStoreItem> getNextItem() {
}
}

final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek();

if (Objects.nonNull(nextClosedPartitionItem)) {
if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) {
closedPartitions.remove();
return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey));
}
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ void queue_closed_then_unassigned_partitions_followed_by_get_returns_expected_pa

final Optional<SourcePartitionStoreItem> acquiredItem = objectUnderTest.getNextItem();
assertThat(acquiredItem.isPresent(), equalTo(true));
assertThat(acquiredItem.get(), equalTo(item));
assertThat(acquiredItem.get(), equalTo(thirdItem));

final Optional<SourcePartitionStoreItem> secondAcquiredItem = objectUnderTest.getNextItem();
assertThat(secondAcquiredItem.isPresent(), equalTo(true));
assertThat(secondAcquiredItem.get(), equalTo(thirdItem));
assertThat(secondAcquiredItem.get(), equalTo(item));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@

public class SchedulingParameterConfiguration {

@JsonProperty("rate")
private Duration rate = Duration.ofHours(8);
@JsonProperty("interval")
private Duration interval = Duration.ofHours(8);

@Min(1)
@JsonProperty("job_count")
private int jobCount = 1;
@JsonProperty("index_read_count")
private int indexReadCount = 1;

@JsonProperty("start_time")
private String startTime = Instant.now().toString();

@JsonIgnore
private Instant startTimeInstant;

public Duration getRate() {
return rate;
public Duration getInterval() {
return interval;
}

public int getJobCount() {
return jobCount;
public int getIndexReadCount() {
return indexReadCount;
}

public Instant getStartTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SearchConfiguration {

private static final ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -27,32 +22,11 @@ public class SearchConfiguration {
@JsonProperty("batch_size")
private Integer batchSize = 1000;

@JsonProperty("query")
private String queryString = "{ \"query\": { \"match_all\": {} }}";

@JsonIgnore
private Map<String, Object> queryMap;

public SearchContextType getSearchContextType() {
return searchContextType;
}

public Integer getBatchSize() {
return batchSize;
}

public Map<String, Object> getQuery() {
return queryMap;
}

@AssertTrue(message = "query is not a valid json string")
boolean isQueryValid() {
try {
queryMap = objectMapper.readValue(queryString, new TypeReference<>() {});
return true;
} catch (final Exception e) {
LOG.error("Invalid query json string: ", e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ static Pair<AcknowledgementSet, CompletableFuture<Boolean>> createAcknowledgment
if (result == true) {
sourceCoordinator.closePartition(
indexPartition.getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount());
}
completableFuture.complete(result);
}, Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS));
Expand All @@ -69,8 +69,8 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc
} else {
sourceCoordinator.closePartition(
indexPartition.getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ void open_search_source_username_password_only() throws JsonProcessingException
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";
final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue());
Expand Down Expand Up @@ -67,10 +66,9 @@ void open_search_disabled_authentication() throws JsonProcessingException {
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";
final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue());
Expand Down Expand Up @@ -100,10 +98,9 @@ void opensearch_source_aws_only() throws JsonProcessingException {
" region: \"us-east-1\"\n" +
" sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand Down Expand Up @@ -131,10 +128,9 @@ void opensearch_source_aws_sts_external_id() throws JsonProcessingException {
" sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" +
" sts_external_id: \"some-random-id\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand Down Expand Up @@ -166,10 +162,9 @@ void using_both_aws_config_and_username_password_is_invalid() throws JsonProcess
" region: \"us-east-1\"\n" +
" sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand All @@ -188,10 +183,9 @@ void one_of_username_password_or_aws_config_or_authDisabled_is_required() throws
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,34 @@ public class SchedulingParameterConfigurationTest {
@Test
void default_scheduling_configuration() {
final SchedulingParameterConfiguration schedulingParameterConfiguration = new SchedulingParameterConfiguration();
assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(1));
assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(1));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true));
assertThat(schedulingParameterConfiguration.getStartTime().isBefore(Instant.now()), equalTo(true));
assertThat(schedulingParameterConfiguration.getRate(), equalTo(Duration.ofHours(8)));
assertThat(schedulingParameterConfiguration.getInterval(), equalTo(Duration.ofHours(8)));
}

@Test
void non_default_scheduling_configuration() throws JsonProcessingException {
final String schedulingConfigurationYaml =
" job_count: 3\n" +
" index_read_count: 3\n" +
" start_time: \"2007-12-03T10:15:30.00Z\"\n";

final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class);

assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true));
assertThat(schedulingParameterConfiguration.getStartTime(), equalTo(Instant.parse("2007-12-03T10:15:30.00Z")));
}

@Test
void invalid_start_time_configuration_test() throws JsonProcessingException {
final String schedulingConfigurationYaml =
" job_count: 3\n" +
" index_read_count: 3\n" +
" start_time: \"2007-12-03T10:15:30\"\n";

final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class);

assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;

public class SearchConfigurationTest {
Expand All @@ -25,32 +24,25 @@ public class SearchConfigurationTest {
void default_search_configuration() {
final SearchConfiguration searchConfiguration = new SearchConfiguration();

assertThat(searchConfiguration.getQuery(), equalTo(null));
assertThat(searchConfiguration.getBatchSize(), equalTo(1000));
}

@Test
void non_default_search_configuration() {
final Map<String, Object> pluginSettings = new HashMap<>();
pluginSettings.put("batch_size", 2000);
pluginSettings.put("query", "{\"query\": {\"match_all\": {} }}");

final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class);
assertThat(searchConfiguration.getBatchSize(),equalTo(2000));
assertThat(searchConfiguration.isQueryValid(), equalTo(true));
assertThat(searchConfiguration.getQuery(), notNullValue());
assertThat(searchConfiguration.getQuery().containsKey("query"), equalTo(true));
}

@Test
void query_is_not_valid_json_string() {

final Map<String, Object> pluginSettings = new HashMap<>();
pluginSettings.put("batch_size", 1000);
pluginSettings.put("query", "\\{query: \"my_query\"}");

final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class);
assertThat(searchConfiguration.getBatchSize(),equalTo(1000));
assertThat(searchConfiguration.isQueryValid(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down Expand Up @@ -265,8 +265,8 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down
Loading
Loading