Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Nov 15, 2024
1 parent 9aea7db commit d40d45c
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.rds.coordination.partition;

import lombok.Getter;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState;
Expand All @@ -18,21 +19,23 @@ public class ResyncPartition extends EnhancedSourcePartition<ResyncProgressState
private final String database;
private final String table;
private final long timestamp;
private final PartitionKeyInfo partitionKeyInfo;
private final ResyncProgressState state;

public ResyncPartition(String database, String table, long timestamp, ResyncProgressState state) {
this.database = database;
this.table = table;
this.timestamp = timestamp;
partitionKeyInfo = new PartitionKeyInfo(database, table, timestamp);
this.state = state;
}

public ResyncPartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|");
database = keySplits[0];
table = keySplits[1];
timestamp = Long.parseLong(keySplits[2]);
partitionKeyInfo = PartitionKeyInfo.fromString(sourcePartitionStoreItem.getSourcePartitionKey());
database = partitionKeyInfo.getDatabase();
table = partitionKeyInfo.getTable();
timestamp = partitionKeyInfo.getTimestamp();
state = convertStringToPartitionProgressState(ResyncProgressState.class, sourcePartitionStoreItem.getPartitionProgressState());
}

Expand All @@ -43,7 +46,7 @@ public String getPartitionType() {

@Override
public String getPartitionKey() {
return database + "|" + table + "|" + timestamp;
return partitionKeyInfo.toString();
}

@Override
Expand All @@ -53,4 +56,34 @@ public Optional<ResyncProgressState> getProgressState() {
}
return Optional.empty();
}

public PartitionKeyInfo getPartitionKeyInfo() {
return partitionKeyInfo;
}

@Getter
public static class PartitionKeyInfo {
private final String database;
private final String table;
private final long timestamp;

private PartitionKeyInfo(String database, String table, long timestamp) {
this.database = database;
this.table = table;
this.timestamp = timestamp;
}

private static PartitionKeyInfo fromString(String partitionKey) {
String[] keySplits = partitionKey.split("\\|");
if (keySplits.length != 3) {
throw new IllegalArgumentException("Invalid partition key: " + partitionKey);
}
return new PartitionKeyInfo(keySplits[0], keySplits[1], Long.parseLong(keySplits[2]));
}

@Override
public String toString() {
return database + "|" + table + "|" + timestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,7 +70,7 @@ public ResyncScheduler(final EnhancedSourceCoordinator sourceCoordinator,

@Override
public void run() {
LOG.debug("Start running Stream Scheduler");
LOG.info("Start running Resync Scheduler");
ResyncPartition resyncPartition = null;
while (!shutdownRequested && !Thread.currentThread().isInterrupted()) {
try {
Expand Down Expand Up @@ -124,7 +126,7 @@ private void processResyncPartition(ResyncPartition resyncPartition) {
}

final ResyncWorker resyncWorker = ResyncWorker.create(
resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet);
resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, getDBTableMetadata());

CompletableFuture.runAsync(resyncWorker, resyncExecutor)
.whenComplete((v, ex) -> {
Expand All @@ -140,4 +142,10 @@ private void processResyncPartition(ResyncPartition resyncPartition) {
}
});
}

private DbTableMetadata getDBTableMetadata() {
final Optional<EnhancedSourcePartition> globalStatePartition = sourceCoordinator.getPartition(sourceConfig.getDbIdentifier());
final GlobalState globalState = (GlobalState) globalStatePartition.get();
return DbTableMetadata.fromMap(globalState.getProgressState().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@
import org.opensearch.dataprepper.plugins.source.rds.converter.RecordConverter;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState;
import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata.DOT_DELIMITER;

public class ResyncWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ResyncWorker.class);
private static final String QUERY_NULL_FORMAT_STRING = "SELECT * FROM %s WHERE %s IS NULL";
private static final String QUERY_NOT_NULL_FORMAT_STRING = "SELECT * FROM %s WHERE %s='%s'";

static final String DATA_PREPPER_EVENT_TYPE = "event";
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
Expand All @@ -37,70 +45,65 @@ public class ResyncWorker implements Runnable {
private final Buffer<Record<Event>> buffer;
private final RecordConverter recordConverter;
private final AcknowledgementSet acknowledgementSet;
private final DbTableMetadata dbTableMetadata;

ResyncWorker(ResyncPartition resyncPartition,
RdsSourceConfig sourceConfig,
QueryManager queryManager,
Buffer<Record<Event>> buffer,
RecordConverter recordConverter,
AcknowledgementSet acknowledgementSet) {
AcknowledgementSet acknowledgementSet,
DbTableMetadata dbTableMetadata) {
this.resyncPartition = resyncPartition;
this.sourceConfig = sourceConfig;
this.queryManager = queryManager;
this.buffer = buffer;
this.recordConverter = recordConverter;
this.acknowledgementSet = acknowledgementSet;
this.dbTableMetadata = dbTableMetadata;
}

public static ResyncWorker create(ResyncPartition resyncPartition,
RdsSourceConfig sourceConfig,
QueryManager queryManager,
Buffer<Record<Event>> buffer,
RecordConverter recordConverter,
AcknowledgementSet acknowledgementSet) {
return new ResyncWorker(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet);
AcknowledgementSet acknowledgementSet,
DbTableMetadata dbTableMetadata) {
return new ResyncWorker(resyncPartition, sourceConfig, queryManager, buffer, recordConverter, acknowledgementSet, dbTableMetadata);
}

public void run() {
String[] keySplits = resyncPartition.getPartitionKey().split("\\|");
final String database = keySplits[0];
final String table = keySplits[1];
final long eventTimestampMillis = Long.parseLong(keySplits[2]);

if (resyncPartition.getProgressState().isEmpty()) {
final String errorMessage = "ResyncPartition " + resyncPartition.getPartitionKey() + " doesn't contain progress state.";
throw new RuntimeException(errorMessage);
}
final ResyncProgressState progressState = resyncPartition.getProgressState().get();
ResyncPartition.PartitionKeyInfo partitionKeyInfo = resyncPartition.getPartitionKeyInfo();
final String database = partitionKeyInfo.getDatabase();
final String table = partitionKeyInfo.getTable();
final long eventTimestampMillis = partitionKeyInfo.getTimestamp();

final ResyncProgressState progressState = validateAndGetProgressState();
final String foreignKeyName = progressState.getForeignKeyName();
final Object updatedValue = progressState.getUpdatedValue();
final List<String> primaryKeys = progressState.getPrimaryKeys();

LOG.debug("Will perform resync on table: {}.{}, with foreign key name: {}, and updated value: {}", database, table, foreignKeyName, updatedValue);
String queryStatement;
if (updatedValue == null) {
queryStatement = String.format("SELECT * FROM %s WHERE %s IS NULL", database + "." + table, foreignKeyName);
} else {
queryStatement = String.format("SELECT * FROM %s WHERE %s='%s'", database + "." + table, foreignKeyName, updatedValue);
}
LOG.debug("Query statement: {}", queryStatement);
final List<Map<String, Object>> rows = executeQuery(database, table, foreignKeyName, updatedValue);

List<Map<String, Object>> rows = queryManager.selectRows(queryStatement);
LOG.debug("Found {} rows to resync", rows.size());
processRows(rows, database, table, primaryKeys, eventTimestampMillis);
}

private void processRows(List<Map<String, Object>> rows, String database, String table, List<String> primaryKeys, long eventTimestampMillis) {
BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);

for (Map<String, Object> row : rows) {
final Event dataPrepperEvent = JacksonEvent.builder()
.withEventType(DATA_PREPPER_EVENT_TYPE)
.withData(row)
.withData(mapDataType(row, database + DOT_DELIMITER + table))
.build();

final Event pipelineEvent = recordConverter.convert(
dataPrepperEvent,
database,
table,
OpenSearchBulkActions.INDEX,
progressState.getPrimaryKeys(),
primaryKeys,
eventTimestampMillis,
eventTimestampMillis,
null);
Expand Down Expand Up @@ -128,4 +131,37 @@ public void run() {
LOG.error("Failed to flush buffer", e);
}
}

private ResyncProgressState validateAndGetProgressState() {
return resyncPartition.getProgressState()
.orElseThrow(() -> new IllegalStateException(
"ResyncPartition " + resyncPartition.getPartitionKey() + " doesn't contain progress state."));
}

private List<Map<String, Object>> executeQuery(String database, String table, String foreignKeyName, Object updatedValue) {
LOG.debug("Will perform resync on table: {}.{}, with foreign key name: {}, and updated value: {}", database, table, foreignKeyName, updatedValue);
final String fullTableName = database + DOT_DELIMITER + table;
String queryStatement;
if (updatedValue == null) {
queryStatement = String.format(QUERY_NULL_FORMAT_STRING, fullTableName, foreignKeyName);
} else {
queryStatement = String.format(QUERY_NOT_NULL_FORMAT_STRING, fullTableName, foreignKeyName, updatedValue);
}
LOG.debug("Query statement: {}", queryStatement);

List<Map<String, Object>> rows = queryManager.selectRows(queryStatement);
LOG.debug("Found {} rows to resync", rows.size());
return rows;
}

private Map<String, Object> mapDataType(final Map<String, Object> rowData, final String fullTableName) {
Map<String, String> columnDataTypeMap = dbTableMetadata.getTableColumnDataTypeMap().get(fullTableName);
Map<String, Object> rowDataAfterMapping = new HashMap<>();
for (Map.Entry<String, Object> entry : rowData.entrySet()) {
final Object data = DataTypeHelper.getDataByColumnType(MySQLDataType.byDataType(columnDataTypeMap.get(entry.getKey())), entry.getKey(),
entry.getValue(), null);
rowDataAfterMapping.put(entry.getKey(), data);
}
return rowDataAfterMapping;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,35 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public class QueryManager {

private static final Logger LOG = LoggerFactory.getLogger(QueryManager.class);

static final int NUM_OF_RETRIES = 3;
static final int BACKOFF_IN_MILLIS = 500;

private final ConnectionManager connectionManager;

public QueryManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

public List<Map<String, Object>> selectRows(String query) {
return executeWithRetry(this::doSelectRows, query);
}

private List<Map<String, Object>> doSelectRows(String query) {
List<Map<String, Object>> result = new ArrayList<>();
try (final Connection connection = connectionManager.getConnection()) {
final Statement statement = connection.createStatement();
try (ResultSet resultSet = statement.executeQuery(query)) {
return convertResultSetToList(resultSet);
}
} catch (Exception e) {
LOG.error("Failed to execute query {}, retrying", query, e);
return result;
LOG.error("Failed to execute query {}, retrying", query);
throw new RuntimeException(e);
}
}

Expand All @@ -53,4 +61,25 @@ private List<Map<String, Object>> convertResultSetToList(ResultSet resultSet) th
}
return result;
}

private <T, R> R executeWithRetry(Function<T, R> function, T query) {
int retry = 0;
while (retry <= NUM_OF_RETRIES) {
try {
return function.apply(query);
} catch (Exception e) {
applyBackoff();
}
retry++;
}
throw new RuntimeException("Failed to execute query after " + NUM_OF_RETRIES + " retries");
}

private void applyBackoff() {
try {
Thread.sleep(BACKOFF_IN_MILLIS);
} catch (final InterruptedException e){
Thread.currentThread().interrupt();
}
}
}
Loading

0 comments on commit d40d45c

Please sign in to comment.