Skip to content

Commit

Permalink
Process resync partitions in RDS source (opensearch-project#5171)
Browse files Browse the repository at this point in the history
* Process resync partition

Signed-off-by: Hai Yan <[email protected]>

* Update tests

Signed-off-by: Hai Yan <[email protected]>

* Cleanup

Signed-off-by: Hai Yan <[email protected]>

* Fix tests after rebase

Signed-off-by: Hai Yan <[email protected]>

* Address comments

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Nov 15, 2024
1 parent 6a46c33 commit 2e32d3d
Show file tree
Hide file tree
Showing 21 changed files with 1,060 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.opensearch.dataprepper.plugins.source.rds.leader.RdsApiStrategy;
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.ResyncScheduler;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.QueryManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class RdsService {
private ExportScheduler exportScheduler;
private DataFileScheduler dataFileScheduler;
private StreamScheduler streamScheduler;
private ResyncScheduler resyncScheduler;

public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
Expand Down Expand Up @@ -129,6 +132,10 @@ public void start(Buffer<Record<Event>> buffer) {
streamScheduler = new StreamScheduler(
sourceCoordinator, sourceConfig, s3PathPrefix, binaryLogClientFactory, buffer, pluginMetrics, acknowledgementSetManager, pluginConfigObservable);
runnableList.add(streamScheduler);

resyncScheduler = new ResyncScheduler(
sourceCoordinator, sourceConfig, getQueryManager(sourceConfig, dbMetadata), s3PathPrefix, buffer, pluginMetrics, acknowledgementSetManager);
runnableList.add(resyncScheduler);
}

executor = Executors.newFixedThreadPool(runnableList.size());
Expand Down Expand Up @@ -158,14 +165,26 @@ public void shutdown() {

private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final ConnectionManager connectionManager = new ConnectionManager(
dbMetadata.getHostName(),
dbMetadata.getEndpoint(),
dbMetadata.getPort(),
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new SchemaManager(connectionManager);
}

private QueryManager getQueryManager(final RdsSourceConfig sourceConfig, final DbMetadata dbMetadata) {
final String readerEndpoint = dbMetadata.getReaderEndpoint() != null ? dbMetadata.getReaderEndpoint() : dbMetadata.getEndpoint();
final int readerPort = dbMetadata.getReaderPort() == 0 ? dbMetadata.getPort() : dbMetadata.getReaderPort();
final ConnectionManager readerConnectionManager = new ConnectionManager(
readerEndpoint,
readerPort,
sourceConfig.getAuthenticationConfig().getUsername(),
sourceConfig.getAuthenticationConfig().getPassword(),
sourceConfig.isTlsEnabled());
return new QueryManager(readerConnectionManager);
}

private String getS3PathPrefix() {
final String s3UserPathPrefix;
if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.rds.coordination.partition;

import lombok.Getter;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState;
Expand All @@ -18,21 +19,23 @@ public class ResyncPartition extends EnhancedSourcePartition<ResyncProgressState
private final String database;
private final String table;
private final long timestamp;
private final PartitionKeyInfo partitionKeyInfo;
private final ResyncProgressState state;

public ResyncPartition(String database, String table, long timestamp, ResyncProgressState state) {
this.database = database;
this.table = table;
this.timestamp = timestamp;
partitionKeyInfo = new PartitionKeyInfo(database, table, timestamp);
this.state = state;
}

public ResyncPartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|");
database = keySplits[0];
table = keySplits[1];
timestamp = Long.parseLong(keySplits[2]);
partitionKeyInfo = PartitionKeyInfo.fromString(sourcePartitionStoreItem.getSourcePartitionKey());
database = partitionKeyInfo.getDatabase();
table = partitionKeyInfo.getTable();
timestamp = partitionKeyInfo.getTimestamp();
state = convertStringToPartitionProgressState(ResyncProgressState.class, sourcePartitionStoreItem.getPartitionProgressState());
}

Expand All @@ -43,7 +46,7 @@ public String getPartitionType() {

@Override
public String getPartitionKey() {
return database + "|" + table + "|" + timestamp;
return partitionKeyInfo.toString();
}

@Override
Expand All @@ -53,4 +56,34 @@ public Optional<ResyncProgressState> getProgressState() {
}
return Optional.empty();
}

public PartitionKeyInfo getPartitionKeyInfo() {
return partitionKeyInfo;
}

@Getter
public static class PartitionKeyInfo {
private final String database;
private final String table;
private final long timestamp;

private PartitionKeyInfo(String database, String table, long timestamp) {
this.database = database;
this.table = table;
this.timestamp = timestamp;
}

private static PartitionKeyInfo fromString(String partitionKey) {
String[] keySplits = partitionKey.split("\\|");
if (keySplits.length != 3) {
throw new IllegalArgumentException("Invalid partition key: " + partitionKey);
}
return new PartitionKeyInfo(keySplits[0], keySplits[1], Long.parseLong(keySplits[2]));
}

@Override
public String toString() {
return database + "|" + table + "|" + timestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ public DbMetadata describeDb(String dbIdentifier) {
try {
final DescribeDbClustersResponse response = rdsClient.describeDBClusters(request);
final DBCluster dbCluster = response.dbClusters().get(0);
return new DbMetadata(dbIdentifier, dbCluster.endpoint(), dbCluster.port());
return DbMetadata.builder()
.dbIdentifier(dbIdentifier)
.endpoint(dbCluster.endpoint())
.port(dbCluster.port())
.readerEndpoint(dbCluster.readerEndpoint())
.readerPort(dbCluster.port())
.build();
} catch (Exception e) {
throw new RuntimeException("Failed to describe DB " + dbIdentifier, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,28 @@ public InstanceApiStrategy(final RdsClient rdsClient) {

@Override
public DbMetadata describeDb(String dbIdentifier) {
final DescribeDbInstancesRequest request = DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(dbIdentifier)
.build();

try {
final DescribeDbInstancesRequest request = DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(dbIdentifier)
.build();

final DescribeDbInstancesResponse response = rdsClient.describeDBInstances(request);
final DBInstance dbInstance = response.dbInstances().get(0);
return new DbMetadata(dbIdentifier, dbInstance.endpoint().address(), dbInstance.endpoint().port());
DbMetadata.DbMetadataBuilder dbMetadataBuilder = DbMetadata.builder()
.dbIdentifier(dbIdentifier)
.endpoint(dbInstance.endpoint().address())
.port(dbInstance.endpoint().port());

if (dbInstance.hasReadReplicaDBInstanceIdentifiers()) {
final DescribeDbInstancesRequest readerInstanceRequest = DescribeDbInstancesRequest.builder()
.dbInstanceIdentifier(dbInstance.readReplicaDBInstanceIdentifiers().get(0))
.build();
final DescribeDbInstancesResponse readerInstanceResponse = rdsClient.describeDBInstances(readerInstanceRequest);
final DBInstance readerInstance = readerInstanceResponse.dbInstances().get(0);
dbMetadataBuilder.readerEndpoint(readerInstance.endpoint().address())
.readerPort(readerInstance.endpoint().port());
}
return dbMetadataBuilder.build();
} catch (Exception e) {
throw new RuntimeException("Failed to describe DB " + dbIdentifier, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,49 @@

package org.opensearch.dataprepper.plugins.source.rds.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Getter
@AllArgsConstructor
@Builder
public class DbMetadata {

private static final String DB_IDENTIFIER_KEY = "dbIdentifier";
private static final String HOST_NAME_KEY = "hostName";
private static final String PORT_KEY = "port";
static final String DB_IDENTIFIER_KEY = "dbIdentifier";
static final String ENDPOINT_KEY = "endpoint";
static final String PORT_KEY = "port";
static final String READER_ENDPOINT_KEY = "readerEndpoint";
static final String READER_PORT_KEY = "readerPort";

private final String dbIdentifier;
private final String hostName;
private final String endpoint;
private final int port;

public DbMetadata(final String dbIdentifier, final String hostName, final int port) {
this.dbIdentifier = dbIdentifier;
this.hostName = hostName;
this.port = port;
}

public String getDbIdentifier() {
return dbIdentifier;
}

public String getHostName() {
return hostName;
}

public int getPort() {
return port;
}
private final String readerEndpoint;
private final int readerPort;

public Map<String, Object> toMap() {
return Map.of(
DB_IDENTIFIER_KEY, dbIdentifier,
HOST_NAME_KEY, hostName,
PORT_KEY, port
);
Map<String, Object> map = new HashMap<>();
map.put(DB_IDENTIFIER_KEY, dbIdentifier);
map.put(ENDPOINT_KEY, endpoint);
map.put(PORT_KEY, port);
map.put(READER_ENDPOINT_KEY, readerEndpoint);
map.put(READER_PORT_KEY, readerPort);

return Collections.unmodifiableMap(map);
}

public static DbMetadata fromMap(Map<String, Object> map) {
return new DbMetadata(
(String) map.get(DB_IDENTIFIER_KEY),
(String) map.get(HOST_NAME_KEY),
((Integer) map.get(PORT_KEY))
(String) map.get(ENDPOINT_KEY),
(Integer) map.get(PORT_KEY),
(String) map.get(READER_ENDPOINT_KEY),
(Integer) map.get(READER_PORT_KEY)
);
}
}
Loading

0 comments on commit 2e32d3d

Please sign in to comment.