Skip to content

Commit

Permalink
Refactor ShardingSphereStatisticsRefreshEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangML committed Jan 22, 2025
1 parent 6ec325e commit d61cbb7
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

import java.sql.SQLException;
Expand All @@ -28,6 +29,7 @@
/**
* PostgreSQL table statistics collector.
*/
@SingletonSPI
public interface PostgreSQLTableStatisticsCollector extends TypedSPI {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

import java.sql.SQLException;
Expand All @@ -28,6 +29,7 @@
/**
* ShardingSphere table statistics collector.
*/
@SingletonSPI
public interface ShardingSphereTableStatisticsCollector extends TypedSPI {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ private void collectAndRefresh() {
if (lockContext.tryLock(lockDefinition, 5000L)) {
try {
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
ShardingSphereStatistics changedStatistics = new ShardingSphereStatistics();
ShardingSphereStatistics collectedStatistics = new ShardingSphereStatistics();
for (ShardingSphereDatabase each : metaData.getAllDatabases()) {
collectForDatabase(each, metaData, changedStatistics);
collectForDatabase(each, metaData, collectedStatistics);
}
compareAndUpdate(changedStatistics);
compareAndUpdate(collectedStatistics);
} finally {
lockContext.unlock(lockDefinition);
}
Expand Down Expand Up @@ -135,61 +135,93 @@ private void collectForTable(final DatabaseType protocolType, final String datab
statistics.putDatabase(databaseName, databaseData);
}

private void compareAndUpdate(final ShardingSphereStatistics changedStatistics) {
private void compareAndUpdate(final ShardingSphereStatistics collectedStatistics) {
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
ShardingSphereStatistics statistics = contextManager.getMetaDataContexts().getStatistics();
for (Entry<String, ShardingSphereDatabaseData> entry : changedStatistics.getDatabaseData().entrySet()) {
compareAndUpdateForDatabase(entry.getKey(), statistics.getDatabase(entry.getKey()), entry.getValue(), statistics, metaData.getDatabase(entry.getKey()));
ShardingSphereStatistics existedStatistics = contextManager.getMetaDataContexts().getStatistics();
for (Entry<String, ShardingSphereDatabaseData> entry : collectedStatistics.getDatabaseData().entrySet()) {
if (existedStatistics.containsDatabase(entry.getKey())) {
compareAndUpdateDatabase(metaData.getDatabase(entry.getKey()), existedStatistics.getDatabase(entry.getKey()), entry.getValue());
} else {
updateDatabase(entry.getKey(), entry.getValue());
existedStatistics.putDatabase(entry.getKey(), entry.getValue());
}
}
for (String each : new ArrayList<>(statistics.getDatabaseData().keySet())) {
if (!changedStatistics.containsDatabase(each)) {
statistics.dropDatabase(each);
for (String each : new ArrayList<>(existedStatistics.getDatabaseData().keySet())) {
if (!collectedStatistics.containsDatabase(each)) {
existedStatistics.dropDatabase(each);
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereDataPersistService().delete(each);
}
}
}

private void compareAndUpdateForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData, final ShardingSphereDatabaseData changedDatabaseData,
final ShardingSphereStatistics statistics, final ShardingSphereDatabase database) {
for (Entry<String, ShardingSphereSchemaData> entry : changedDatabaseData.getSchemaData().entrySet()) {
compareAndUpdateForSchema(databaseName, entry.getKey(), databaseData.getSchema(entry.getKey()), entry.getValue(), statistics, database.getSchema(entry.getKey()));
private void compareAndUpdateDatabase(final ShardingSphereDatabase database, final ShardingSphereDatabaseData currentDatabaseData,
final ShardingSphereDatabaseData collectedDatabaseData) {
for (Entry<String, ShardingSphereSchemaData> entry : collectedDatabaseData.getSchemaData().entrySet()) {
if (currentDatabaseData.containsSchema(entry.getKey())) {
compareAndUpdateSchema(database.getName(), entry.getKey(), currentDatabaseData.getSchema(entry.getKey()), entry.getValue());
} else {
updateSchema(database.getName(), entry.getKey(), entry.getValue());
currentDatabaseData.putSchema(entry.getKey(), entry.getValue());
}
}
}

private void compareAndUpdateForSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData,
final ShardingSphereSchemaData changedSchemaData, final ShardingSphereStatistics statistics, final ShardingSphereSchema schema) {
for (Entry<String, ShardingSphereTableData> entry : changedSchemaData.getTableData().entrySet()) {
compareAndUpdateForTable(databaseName, schemaName, schemaData.getTable(entry.getKey()), entry.getValue(), statistics, schema.getTable(entry.getKey()));
private void compareAndUpdateSchema(final String databaseName, final String schemaName,
final ShardingSphereSchemaData currentSchemaData, final ShardingSphereSchemaData collectedSchemaData) {
for (Entry<String, ShardingSphereTableData> entry : collectedSchemaData.getTableData().entrySet()) {
if (currentSchemaData.containsTable(entry.getKey())) {
compareAndUpdateTable(databaseName, schemaName, currentSchemaData.getTable(entry.getKey()), entry.getValue());
} else {
updateTable(databaseName, schemaName, entry.getKey(), entry.getValue());
currentSchemaData.putTable(entry.getKey(), entry.getValue());
}
}
}

private void compareAndUpdateForTable(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
final ShardingSphereTableData changedTableData, final ShardingSphereStatistics statistics, final ShardingSphereTable table) {
if (!tableData.equals(changedTableData)) {
statistics.getDatabase(databaseName).getSchema(schemaName).putTable(changedTableData.getName(), changedTableData);
AlteredShardingSphereDatabaseData alteredShardingSphereDatabaseData = createAlteredShardingSphereDatabaseData(databaseName, schemaName, tableData, changedTableData, table);
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereDataPersistService().update(alteredShardingSphereDatabaseData);
private void compareAndUpdateTable(final String databaseName, final String schemaName,
final ShardingSphereTableData currentTableData, final ShardingSphereTableData collectedTableData) {
if (currentTableData.equals(collectedTableData)) {
return;
}
}

private AlteredShardingSphereDatabaseData createAlteredShardingSphereDatabaseData(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
final ShardingSphereTableData changedTableData, final ShardingSphereTable table) {
AlteredShardingSphereDatabaseData result = new AlteredShardingSphereDatabaseData(databaseName, schemaName, tableData.getName());
Map<String, ShardingSphereRowData> tableDataMap = tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
Map<String, ShardingSphereRowData> changedTableDataMap = changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getAllColumns()));
for (Entry<String, ShardingSphereRowData> entry : changedTableDataMap.entrySet()) {
if (!tableDataMap.containsKey(entry.getKey())) {
result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
} else if (!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
ShardingSphereTable shardingSphereTable = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName).getSchema(schemaName).getTable(currentTableData.getName());
AlteredShardingSphereDatabaseData alteredShardingSphereDatabaseData = new AlteredShardingSphereDatabaseData(databaseName, schemaName, currentTableData.getName());
Map<String, ShardingSphereRowData> currentTableRowData = currentTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
Map<String, ShardingSphereRowData> collectedTableRowData = collectedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(shardingSphereTable.getAllColumns()));
for (Entry<String, ShardingSphereRowData> entry : collectedTableRowData.entrySet()) {
if (!currentTableRowData.containsKey(entry.getKey())) {
alteredShardingSphereDatabaseData.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
} else if (!currentTableRowData.get(entry.getKey()).equals(entry.getValue())) {
alteredShardingSphereDatabaseData.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
}
}
for (Entry<String, ShardingSphereRowData> entry : tableDataMap.entrySet()) {
if (!changedTableDataMap.containsKey(entry.getKey())) {
result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
for (Entry<String, ShardingSphereRowData> entry : currentTableRowData.entrySet()) {
if (!collectedTableRowData.containsKey(entry.getKey())) {
alteredShardingSphereDatabaseData.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
}
}
return result;
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereDataPersistService().update(alteredShardingSphereDatabaseData);
}

private void updateDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData) {
for (Entry<String, ShardingSphereSchemaData> entry : databaseData.getSchemaData().entrySet()) {
updateSchema(databaseName, entry.getKey(), entry.getValue());
}
}

private void updateSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData) {
for (Entry<String, ShardingSphereTableData> entry : schemaData.getTableData().entrySet()) {
updateTable(databaseName, schemaName, entry.getKey(), entry.getValue());
}
}

private void updateTable(final String databaseName, final String schemaName, final String tableName, final ShardingSphereTableData tableData) {
ShardingSphereTable shardingSphereTable = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName).getSchema(schemaName).getTable(tableName);
AlteredShardingSphereDatabaseData alteredShardingSphereDatabaseData = new AlteredShardingSphereDatabaseData(databaseName, schemaName, tableName);
YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(shardingSphereTable.getAllColumns()));
for (ShardingSphereRowData each : tableData.getRows()) {
alteredShardingSphereDatabaseData.getAddedRows().add(swapper.swapToYamlConfiguration(each));
}
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereDataPersistService().update(alteredShardingSphereDatabaseData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public Optional<ShardingSphereTableData> collect(final String databaseName, fina

@Override
public String getType() {
return "test_table";
return "foo_database";
}
}
Loading

0 comments on commit d61cbb7

Please sign in to comment.