From fc572249a77b43427ced41e78a3872d651e99904 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Mon, 30 Dec 2024 23:34:43 +0800 Subject: [PATCH] Use DataChangedEventHandler to instead of DispatchEventBuilder --- .../handler/DataChangedEventHandler.java | 55 +++++++++++++++++++ .../type/ClusterStateChangedHandler.java} | 18 +++--- .../DataChangedEventListenerRegistry.java | 10 ++++ .../type/GlobalMetaDataChangedHandler.java | 42 ++++++++++++++ ...vent.dispatch.builder.DispatchEventBuilder | 1 - ...t.dispatch.handler.DataChangedEventHandler | 18 ++++++ .../type/ClusterStateChangedHandlerTest.java} | 45 ++++++++------- 7 files changed, 158 insertions(+), 31 deletions(-) create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/DataChangedEventHandler.java rename mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/{builder/type/ClusterStateDispatchEventBuilder.java => handler/type/ClusterStateChangedHandler.java} (73%) create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/GlobalMetaDataChangedHandler.java create mode 100644 mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler rename mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/{builder/type/ClusterStateDispatchEventBuilderTest.java => handler/type/ClusterStateChangedHandlerTest.java} (50%) diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/DataChangedEventHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/DataChangedEventHandler.java new file mode 100644 index 0000000000000..1b14eaf2d0374 --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/DataChangedEventHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler; + +import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; +import org.apache.shardingsphere.mode.manager.ContextManager; + +import java.util.Collection; + +/** + * Data changed event handler. + */ +@SingletonSPI +public interface DataChangedEventHandler { + + /** + * Get subscribed key. + * + * @return subscribed key + */ + String getSubscribedKey(); + + /** + * Get subscribed types. + * + * @return subscribed types + */ + Collection getSubscribedTypes(); + + + /** + * Handle data changed event. + * + * @param contextManager context manager + * @param event data changed event + */ + void handle(ContextManager contextManager, DataChangedEvent event); +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/builder/type/ClusterStateDispatchEventBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/type/ClusterStateChangedHandler.java similarity index 73% rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/builder/type/ClusterStateDispatchEventBuilder.java rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/type/ClusterStateChangedHandler.java index fdc7db6c7a369..5117361c659c7 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/builder/type/ClusterStateDispatchEventBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/type/ClusterStateChangedHandler.java @@ -15,24 +15,22 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type; +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.type; -import org.apache.shardingsphere.mode.state.ClusterState; import org.apache.shardingsphere.metadata.persist.node.StatesNode; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; -import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.DispatchEvent; -import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.cluster.ClusterStateEvent; -import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler; +import org.apache.shardingsphere.mode.state.ClusterState; import java.util.Arrays; import java.util.Collection; -import java.util.Optional; /** - * Cluster state dispatch event builder. + * Cluster state changed handler. */ -public final class ClusterStateDispatchEventBuilder implements DispatchEventBuilder { +public final class ClusterStateChangedHandler implements DataChangedEventHandler { @Override public String getSubscribedKey() { @@ -45,8 +43,8 @@ public Collection getSubscribedTypes() { } @Override - public Optional build(final DataChangedEvent event) { - return event.getKey().equals(StatesNode.getClusterStateNodePath()) ? Optional.of(new ClusterStateEvent(getClusterState(event))) : Optional.empty(); + public void handle(final ContextManager contextManager, final DataChangedEvent event) { + contextManager.getStateContext().switchState(getClusterState(event)); } private ClusterState getClusterState(final DataChangedEvent event) { diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/DataChangedEventListenerRegistry.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/DataChangedEventListenerRegistry.java index 7daeb327d3471..9122a5006c93f 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/DataChangedEventListenerRegistry.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/DataChangedEventListenerRegistry.java @@ -22,7 +22,9 @@ import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.DatabaseMetaDataChangedListener; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.GlobalMetaDataChangedHandler; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.GlobalMetaDataChangedListener; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; @@ -33,6 +35,8 @@ */ public final class DataChangedEventListenerRegistry { + private final ContextManager contextManager; + private final ClusterPersistRepository repository; private final EventBusContext eventBusContext; @@ -40,6 +44,7 @@ public final class DataChangedEventListenerRegistry { private final Collection databaseNames; public DataChangedEventListenerRegistry(final ContextManager contextManager, final Collection databaseNames) { + this.contextManager = contextManager; repository = (ClusterPersistRepository) contextManager.getPersistServiceFacade().getRepository(); eventBusContext = contextManager.getComputeNodeInstanceContext().getEventBusContext(); this.databaseNames = databaseNames; @@ -51,6 +56,7 @@ public DataChangedEventListenerRegistry(final ContextManager contextManager, fin public void register() { databaseNames.forEach(this::registerDatabaseListeners); ShardingSphereServiceLoader.getServiceInstances(DispatchEventBuilder.class).forEach(this::registerGlobalListeners); + ShardingSphereServiceLoader.getServiceInstances(DataChangedEventHandler.class).forEach(this::registerGlobalHandlers); } private void registerDatabaseListeners(final String databaseName) { @@ -60,4 +66,8 @@ private void registerDatabaseListeners(final String databaseName) { private void registerGlobalListeners(final DispatchEventBuilder builder) { repository.watch(builder.getSubscribedKey(), new GlobalMetaDataChangedListener(eventBusContext, builder)); } + + private void registerGlobalHandlers(final DataChangedEventHandler handler) { + repository.watch(handler.getSubscribedKey(), new GlobalMetaDataChangedHandler(contextManager, handler)); + } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/GlobalMetaDataChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/GlobalMetaDataChangedHandler.java new file mode 100644 index 0000000000000..d68a42ce5e6cd --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/listener/type/GlobalMetaDataChangedHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type; + +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler; +import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; + +/** + * Global meta data changed handler. + */ +@RequiredArgsConstructor +public final class GlobalMetaDataChangedHandler implements DataChangedEventListener { + + private final ContextManager contextManager; + + private final DataChangedEventHandler handler; + + @Override + public void onChange(final DataChangedEvent event) { + if (handler.getSubscribedTypes().contains(event.getType())) { + handler.handle(contextManager, event); + } + } +} diff --git a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder index 8b04a9c552eec..44fa3d000c61d 100644 --- a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder +++ b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder @@ -17,7 +17,6 @@ org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.QualifiedDataSourceDispatchEventBuilder org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.ComputeNodeStateDispatchEventBuilder -org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.ClusterStateDispatchEventBuilder org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.ShardingSphereDataDispatchEventBuilder org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.GlobalRuleDispatchEventBuilder org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.PropertiesDispatchEventBuilder diff --git a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler new file mode 100644 index 0000000000000..fa495b843dda7 --- /dev/null +++ b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.type.ClusterStateChangedHandler diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/builder/type/ClusterStateDispatchEventBuilderTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/type/ClusterStateChangedHandlerTest.java similarity index 50% rename from mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/builder/type/ClusterStateDispatchEventBuilderTest.java rename to mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/type/ClusterStateChangedHandlerTest.java index c23bffcea4458..c4162f648372c 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/builder/type/ClusterStateDispatchEventBuilderTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/type/ClusterStateChangedHandlerTest.java @@ -15,41 +15,46 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type; +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.type; -import org.apache.shardingsphere.mode.state.ClusterState; +import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; -import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.DispatchEvent; -import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.cluster.ClusterStateEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler; +import org.apache.shardingsphere.mode.state.ClusterState; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; -import java.util.Optional; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; -class ClusterStateDispatchEventBuilderTest { +@ExtendWith(MockitoExtension.class) +class ClusterStateChangedHandlerTest { - private final ClusterStateDispatchEventBuilder builder = new ClusterStateDispatchEventBuilder(); + private DataChangedEventHandler handler; - @Test - void assertGetSubscribedKey() { - assertThat(builder.getSubscribedKey(), is("/states/cluster_state")); + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ContextManager contextManager; + + @BeforeEach + void setUp() { + handler = ShardingSphereServiceLoader.getServiceInstances(DataChangedEventHandler.class).stream() + .filter(each -> each.getSubscribedKey().equals("/states/cluster_state")).findFirst().orElse(null); } @Test void assertBuildEventWithValidClusterState() { - Optional actual = builder.build(new DataChangedEvent("/states/cluster_state", ClusterState.READ_ONLY.name(), Type.UPDATED)); - assertTrue(actual.isPresent()); - assertThat(((ClusterStateEvent) actual.get()).getClusterState(), is(ClusterState.READ_ONLY)); + handler.handle(contextManager, new DataChangedEvent("/states/cluster_state", ClusterState.READ_ONLY.name(), Type.UPDATED)); + verify(contextManager.getStateContext()).switchState(ClusterState.READ_ONLY); } @Test void assertBuildEventWithInvalidClusterState() { - Optional actual = builder.build(new DataChangedEvent("/states/cluster_state", "INVALID", Type.UPDATED)); - assertTrue(actual.isPresent()); - assertThat(((ClusterStateEvent) actual.get()).getClusterState(), is(ClusterState.OK)); + handler.handle(contextManager, new DataChangedEvent("/states/cluster_state", "INVALID", Type.UPDATED)); + verify(contextManager.getStateContext()).switchState(ClusterState.OK); } }