Skip to content

Commit

Permalink
Use MetaDataChangedHandler to instead of MetaDataChangedSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jan 1, 2025
1 parent 1506ada commit f1c37bd
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private void handleTableChanged(final String databaseName, final String schemaNa
private void handleTableCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) {
String tableName = TableMetaDataNode.getTableNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found."));
Preconditions.checkArgument(event.getValue().equals(
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
"Invalid active version: %s of key: %s", event.getValue(), event.getKey());
ShardingSphereTable table = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataFacade().getTable().load(databaseName, schemaName, tableName);
contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, table, null);
Expand Down Expand Up @@ -138,7 +138,7 @@ private void handleViewChanged(final String databaseName, final String schemaNam
private void handleViewCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) {
String viewName = ViewMetaDataNode.getViewNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found."));
Preconditions.checkArgument(event.getValue().equals(
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
"Invalid active version: %s of key: %s", event.getValue(), event.getKey());
ShardingSphereView view = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataFacade().getView().load(databaseName, schemaName, viewName);
contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, null, view);
Expand Down Expand Up @@ -177,15 +177,15 @@ private void handleStorageUnitChanged(final String databaseName, final DataChang

private void handleStorageUnitRegistered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) {
Preconditions.checkArgument(event.getValue().equals(
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
"Invalid active version: %s of key: %s", event.getValue(), event.getKey());
DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load(databaseName, dataSourceUnitName);
contextManager.getMetaDataContextManager().getStorageUnitManager().registerStorageUnit(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps));
}

private void handleStorageUnitAltered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) {
Preconditions.checkArgument(event.getValue().equals(
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())),
"Invalid active version: %s of key: %s", event.getValue(), event.getKey());
DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load(databaseName, dataSourceUnitName);
contextManager.getMetaDataContextManager().getStorageUnitManager().alterStorageUnit(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type.CacheEvictedSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type.RuleItemChangedSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type.StorageUnitEventSubscriber;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -36,8 +35,6 @@ public final class ClusterDispatchEventSubscriberRegistry {
private final Collection<EventSubscriber> subscribers;

public ClusterDispatchEventSubscriberRegistry(final ContextManager contextManager) {
subscribers = Arrays.asList(new RuleItemChangedSubscriber(contextManager),
new CacheEvictedSubscriber(),
new StorageUnitEventSubscriber(contextManager));
subscribers = Arrays.asList(new RuleItemChangedSubscriber(contextManager), new CacheEvictedSubscriber());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.database;

import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
Expand All @@ -34,6 +35,7 @@
import org.mockito.quality.Strictness;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -120,4 +122,27 @@ void assertHandleViewDropped() {
handler.handle("foo_db", new DataChangedEvent("/metadata/foo_db/schemas/foo_schema/views/foo_view", "", Type.DELETED));
verify(contextManager.getMetaDataContextManager().getSchemaMetaDataManager()).alterSchema("foo_db", "foo_schema", null, "foo_view");
}

@Test
void assertHandleStorageUnitRegistered() {
when(contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath("key")).thenReturn("value");
when(contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load("foo_db", "foo_unit")).thenReturn(mock(DataSourcePoolProperties.class));
handler.handle("foo_db", new DataChangedEvent("/metadata/foo_db/data_sources/units/foo_unit/active_version/0", "0", Type.ADDED));
verify(contextManager.getMetaDataContextManager().getStorageUnitManager()).registerStorageUnit(eq("foo_db"), any());
}

@Test
void assertHandleStorageUnitAltered() {
when(contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath("key")).thenReturn("value");
when(contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load("foo_db", "foo_unit")).thenReturn(mock(DataSourcePoolProperties.class));
handler.handle("foo_db", new DataChangedEvent("/metadata/foo_db/data_sources/units/foo_unit/active_version/0", "0", Type.UPDATED));
verify(contextManager.getMetaDataContextManager().getStorageUnitManager()).alterStorageUnit(eq("foo_db"), any());
}

@Test
void assertHandleStorageUnitUnregistered() {
when(contextManager.getMetaDataContexts().getMetaData().containsDatabase("foo_db")).thenReturn(true);
handler.handle("foo_db", new DataChangedEvent("/metadata/foo_db/data_sources/units/foo_unit", "", Type.DELETED));
verify(contextManager.getMetaDataContextManager().getStorageUnitManager()).unregisterStorageUnit("foo_db", "foo_unit");
}
}

This file was deleted.

0 comments on commit f1c37bd

Please sign in to comment.