Skip to content

Commit

Permalink
Add acknowledgments for the ddb source (opensearch-project#3575)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 2, 2023
1 parent 7869eb7 commit d2007bc
Show file tree
Hide file tree
Showing 33 changed files with 525 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ public interface EnhancedSourceCoordinator {
*
* @param partition The partition to be updated.
* @param <T> The progress state class
* @param ownershipTimeoutRenewal The amount of time to update ownership of the partition before another instance can acquire it.
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
<T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition);
<T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition, Duration ownershipTimeoutRenewal);

/**
* This method is used to release the lease of a partition in the coordination store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public boolean release(final EventHandle eventHandle, final boolean result) {
callbackFuture = executor.submit(() -> callback.accept(this.result));
return true;
} else if (pendingAcknowledgments.size() == 0) {
LOG.warn("Acknowledgement set is not completed. Delaying callback until it is completed");
LOG.debug("Acknowledgement set is not completed. Delaying callback until it is completed");
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public Optional<EnhancedSourcePartition> acquireAvailablePartition(String partit


@Override
public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition) {
public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition, final Duration ownershipTimeoutRenewal) {
String partitionType = partition.getPartitionType() == null ? DEFAULT_GLOBAL_STATE_PARTITION_TYPE : partition.getPartitionType();
LOG.debug("Try to save progress for partition {} (Type {})", partition.getPartitionKey(), partitionType);

Expand All @@ -140,7 +140,7 @@ public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partiti
final SourcePartitionStoreItem updateItem = partition.getSourcePartitionStoreItem();
// Also extend the timeout of the lease (ownership)
if (updateItem.getPartitionOwnershipTimeout() != null) {
updateItem.setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT));
updateItem.setPartitionOwnershipTimeout(Instant.now().plus(ownershipTimeoutRenewal == null ? DEFAULT_LEASE_TIMEOUT : ownershipTimeoutRenewal));
}
updateItem.setPartitionProgressState(partition.convertPartitionProgressStatetoString(partition.getProgressState()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void test_saveProgressStateForPartition() {
Optional<EnhancedSourcePartition> sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE);
assertThat(sourcePartition.isPresent(), equalTo(true));
TestEnhancedSourcePartition partition = (TestEnhancedSourcePartition) sourcePartition.get();
coordinator.saveProgressStateForPartition(partition);
coordinator.saveProgressStateForPartition(partition, null);

verify(sourceCoordinationStore).tryAcquireAvailablePartition(anyString(), anyString(), any(Duration.class));
verify(sourceCoordinationStore).tryUpdateSourcePartitionItem(any(SourcePartitionStoreItem.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.ExportConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.s3.S3Client;
Expand All @@ -17,15 +20,19 @@ public class ClientFactory {

private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;
private final ExportConfig exportConfig;

public ClientFactory(AwsCredentialsSupplier awsCredentialsSupplier, AwsAuthenticationConfig awsAuthenticationConfig) {
public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig,
final ExportConfig exportConfig) {
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
this.awsAuthenticationConfig = awsAuthenticationConfig;
this.exportConfig = exportConfig;
}


Expand All @@ -47,8 +54,20 @@ public DynamoDbClient buildDynamoDBClient() {

public S3Client buildS3Client() {
return S3Client.builder()
.region(getS3ClientRegion())
.credentialsProvider(awsCredentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build())
.build())
.build();
}

private Region getS3ClientRegion() {
if (exportConfig != null && exportConfig.getAwsRegion() != null) {
return exportConfig.getAwsRegion();
}

return awsAuthenticationConfig.getAwsRegion();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
Expand Down Expand Up @@ -59,6 +60,8 @@ public class DynamoDBService {
private final EnhancedSourceCoordinator coordinator;

private final DynamoDbClient dynamoDbClient;

private final DynamoDBSourceConfig dynamoDBSourceConfig;
//
private final DynamoDbStreamsClient dynamoDbStreamsClient;

Expand All @@ -70,13 +73,21 @@ public class DynamoDBService {

private final PluginMetrics pluginMetrics;

private final AcknowledgementSetManager acknowledgementSetManager;

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;


public DynamoDBService(EnhancedSourceCoordinator coordinator, ClientFactory clientFactory, DynamoDBSourceConfig sourceConfig, PluginMetrics pluginMetrics) {
public DynamoDBService(final EnhancedSourceCoordinator coordinator,
final ClientFactory clientFactory,
final DynamoDBSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.coordinator = coordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.dynamoDBSourceConfig = sourceConfig;

// Initialize AWS clients
dynamoDbClient = clientFactory.buildDynamoDBClient();
Expand All @@ -103,10 +114,10 @@ public void start(Buffer<Record<Event>> buffer) {
Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics);

DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, shardManager, buffer);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

// May consider start or shutdown the scheduler on demand
// Currently, event after the exports are done, the related scheduler will not be shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -39,19 +40,26 @@ public class DynamoDBSource implements Source<Record<Event>>, UsesEnhancedSource

private final ClientFactory clientFactory;

private final AcknowledgementSetManager acknowledgementSetManager;

private EnhancedSourceCoordinator coordinator;

private DynamoDBService dynamoDBService;


@DataPrepperPluginConstructor
public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig sourceConfig, final PluginFactory pluginFactory, final AwsCredentialsSupplier awsCredentialsSupplier) {
public DynamoDBSource(final PluginMetrics pluginMetrics,
final DynamoDBSourceConfig sourceConfig,
final PluginFactory pluginFactory,
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
LOG.info("Create DynamoDB Source");
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig(), sourceConfig.getTableConfigs().get(0).getExportConfig());
}

@Override
Expand All @@ -61,7 +69,7 @@ public void start(Buffer<Record<Event>> buffer) {
coordinator.createPartition(new InitPartition());

// Create DynamoDB Service
dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics);
dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager);
dynamoDBService.init();

LOG.info("Start DynamoDB service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -19,14 +22,22 @@
public class DynamoDBSourceConfig {

@JsonProperty("tables")
private List<TableConfig> tableConfigs;

private List<TableConfig> tableConfigs = Collections.emptyList();

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationConfig awsAuthenticationConfig;

@JsonProperty("acknowledgments")
private boolean acknowledgments = false;

@JsonProperty("shard_acknowledgment_timeout")
private Duration shardAcknowledgmentTimeout = Duration.ofMinutes(3);

@JsonProperty("s3_data_file_acknowledgment_timeout")
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(5);

public DynamoDBSourceConfig() {
}

Expand All @@ -38,4 +49,19 @@ public List<TableConfig> getTableConfigs() {
public AwsAuthenticationConfig getAwsAuthenticationConfig() {
return awsAuthenticationConfig;
}

public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public Duration getShardAcknowledgmentTimeout() {
return shardAcknowledgmentTimeout;
}

public Duration getDataFileAcknowledgmentTimeout() { return dataFileAcknowledgmentTimeout; }

@AssertTrue(message = "Exactly one table must be configured for the DynamoDb source.")
boolean isExactlyOneTableConfigured() {
return tableConfigs.size() == 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,30 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotBlank;
import software.amazon.awssdk.regions.Region;

public class ExportConfig {

@JsonProperty("s3_bucket")
@NotBlank(message = "Bucket Name is required for export")
private String s3Bucket;

@JsonProperty("s3_prefix")
private String s3Prefix;

@JsonProperty("s3_region")
private String s3Region;

public String getS3Bucket() {
return s3Bucket;
}

public String getS3Prefix() {
return s3Prefix;
}

public Region getAwsRegion() {
return s3Region != null ? Region.of(s3Region) : null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
Expand Down Expand Up @@ -58,13 +59,13 @@ String getEventType() {
return "EXPORT";
}

public void writeToBuffer(List<String> lines) {
public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<String> lines) {

int eventCount = 0;
for (String line : lines) {
Map data = (Map<String, Object>) convertToMap(line).get(ITEM_KEY);
try {
addToBuffer(data);
addToBuffer(acknowledgementSet, data);
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand Down Expand Up @@ -70,7 +71,7 @@ void flushBuffer() throws Exception {
* @param eventName Event name
* @throws Exception Exception if failed to write to buffer.
*/
public void addToBuffer(Map<String, Object> data, Map<String, Object> keys, long eventCreationTimeMillis, String eventName) throws Exception {
public void addToBuffer(final AcknowledgementSet acknowledgementSet, Map<String, Object> data, Map<String, Object> keys, long eventCreationTimeMillis, String eventName) throws Exception {
Event event = JacksonEvent.builder()
.withEventType(getEventType())
.withData(data)
Expand All @@ -90,13 +91,16 @@ public void addToBuffer(Map<String, Object> data, Map<String, Object> keys, long
} else {
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey);
}
if (acknowledgementSet != null) {
acknowledgementSet.add(event);
}
bufferAccumulator.add(new Record<>(event));
}

public void addToBuffer(Map<String, Object> data) throws Exception {
public void addToBuffer(final AcknowledgementSet acknowledgementSet, Map<String, Object> data) throws Exception {
// Export data doesn't have an event timestamp
// Default to current timestamp when the event is added to buffer
addToBuffer(data, data, System.currentTimeMillis(), null);
addToBuffer(acknowledgementSet, data, data, System.currentTimeMillis(), null);
}

private String mapStreamEventNameToBulkAction(final String streamEventName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -53,7 +54,7 @@ String getEventType() {
}


public void writeToBuffer(List<Record> records) {
public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Record> records) {

int eventCount = 0;
for (Record record : records) {
Expand All @@ -63,7 +64,7 @@ public void writeToBuffer(List<Record> records) {
Map<String, Object> keys = convertKeys(record.dynamodb().keys());

try {
addToBuffer(data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), record.eventNameAsString());
addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), record.eventNameAsString());
eventCount++;
} catch (Exception e) {
// will this cause too many logs?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DataFilePartition extends EnhancedSourcePartition<DataFileProgressS
private final String bucket;
private final String key;

private final DataFileProgressState state;
private DataFileProgressState state;

public DataFilePartition(SourcePartitionStoreItem sourcePartitionStoreItem) {

Expand Down
Loading

0 comments on commit d2007bc

Please sign in to comment.