Skip to content

Commit

Permalink
Use DataChangedEventHandler to instead of DispatchEventBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 30, 2024
1 parent 8e52107 commit fc57224
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler;

import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.ContextManager;

import java.util.Collection;

/**
* Data changed event handler.
*/
@SingletonSPI
public interface DataChangedEventHandler {

/**
* Get subscribed key.
*
* @return subscribed key
*/
String getSubscribedKey();

/**
* Get subscribed types.
*
* @return subscribed types
*/
Collection<Type> getSubscribedTypes();


/**
* Handle data changed event.
*
* @param contextManager context manager
* @param event data changed event
*/
void handle(ContextManager contextManager, DataChangedEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,22 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type;
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.type;

import org.apache.shardingsphere.mode.state.ClusterState;
import org.apache.shardingsphere.metadata.persist.node.StatesNode;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.DispatchEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.cluster.ClusterStateEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler;
import org.apache.shardingsphere.mode.state.ClusterState;

import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;

/**
* Cluster state dispatch event builder.
* Cluster state changed handler.
*/
public final class ClusterStateDispatchEventBuilder implements DispatchEventBuilder<DispatchEvent> {
public final class ClusterStateChangedHandler implements DataChangedEventHandler {

@Override
public String getSubscribedKey() {
Expand All @@ -45,8 +43,8 @@ public Collection<Type> getSubscribedTypes() {
}

@Override
public Optional<DispatchEvent> build(final DataChangedEvent event) {
return event.getKey().equals(StatesNode.getClusterStateNodePath()) ? Optional.of(new ClusterStateEvent(getClusterState(event))) : Optional.empty();
public void handle(final ContextManager contextManager, final DataChangedEvent event) {
contextManager.getStateContext().switchState(getClusterState(event));
}

private ClusterState getClusterState(final DataChangedEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.DispatchEventBuilder;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.DatabaseMetaDataChangedListener;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.GlobalMetaDataChangedHandler;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.GlobalMetaDataChangedListener;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

Expand All @@ -33,13 +35,16 @@
*/
public final class DataChangedEventListenerRegistry {

private final ContextManager contextManager;

private final ClusterPersistRepository repository;

private final EventBusContext eventBusContext;

private final Collection<String> databaseNames;

public DataChangedEventListenerRegistry(final ContextManager contextManager, final Collection<String> databaseNames) {
this.contextManager = contextManager;
repository = (ClusterPersistRepository) contextManager.getPersistServiceFacade().getRepository();
eventBusContext = contextManager.getComputeNodeInstanceContext().getEventBusContext();
this.databaseNames = databaseNames;
Expand All @@ -51,6 +56,7 @@ public DataChangedEventListenerRegistry(final ContextManager contextManager, fin
public void register() {
databaseNames.forEach(this::registerDatabaseListeners);
ShardingSphereServiceLoader.getServiceInstances(DispatchEventBuilder.class).forEach(this::registerGlobalListeners);
ShardingSphereServiceLoader.getServiceInstances(DataChangedEventHandler.class).forEach(this::registerGlobalHandlers);
}

private void registerDatabaseListeners(final String databaseName) {
Expand All @@ -60,4 +66,8 @@ private void registerDatabaseListeners(final String databaseName) {
private void registerGlobalListeners(final DispatchEventBuilder<?> builder) {
repository.watch(builder.getSubscribedKey(), new GlobalMetaDataChangedListener(eventBusContext, builder));
}

private void registerGlobalHandlers(final DataChangedEventHandler handler) {
repository.watch(handler.getSubscribedKey(), new GlobalMetaDataChangedHandler(contextManager, handler));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;

/**
* Global meta data changed handler.
*/
@RequiredArgsConstructor
public final class GlobalMetaDataChangedHandler implements DataChangedEventListener {

private final ContextManager contextManager;

private final DataChangedEventHandler handler;

@Override
public void onChange(final DataChangedEvent event) {
if (handler.getSubscribedTypes().contains(event.getType())) {
handler.handle(contextManager, event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.QualifiedDataSourceDispatchEventBuilder
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.ComputeNodeStateDispatchEventBuilder
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.ClusterStateDispatchEventBuilder
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.ShardingSphereDataDispatchEventBuilder
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.GlobalRuleDispatchEventBuilder
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type.PropertiesDispatchEventBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.type.ClusterStateChangedHandler
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,46 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.builder.type;
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.type;

import org.apache.shardingsphere.mode.state.ClusterState;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.DispatchEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.cluster.ClusterStateEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.DataChangedEventHandler;
import org.apache.shardingsphere.mode.state.ClusterState;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Optional;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;

class ClusterStateDispatchEventBuilderTest {
@ExtendWith(MockitoExtension.class)
class ClusterStateChangedHandlerTest {

private final ClusterStateDispatchEventBuilder builder = new ClusterStateDispatchEventBuilder();
private DataChangedEventHandler handler;

@Test
void assertGetSubscribedKey() {
assertThat(builder.getSubscribedKey(), is("/states/cluster_state"));
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ContextManager contextManager;

@BeforeEach
void setUp() {
handler = ShardingSphereServiceLoader.getServiceInstances(DataChangedEventHandler.class).stream()
.filter(each -> each.getSubscribedKey().equals("/states/cluster_state")).findFirst().orElse(null);
}

@Test
void assertBuildEventWithValidClusterState() {
Optional<DispatchEvent> actual = builder.build(new DataChangedEvent("/states/cluster_state", ClusterState.READ_ONLY.name(), Type.UPDATED));
assertTrue(actual.isPresent());
assertThat(((ClusterStateEvent) actual.get()).getClusterState(), is(ClusterState.READ_ONLY));
handler.handle(contextManager, new DataChangedEvent("/states/cluster_state", ClusterState.READ_ONLY.name(), Type.UPDATED));
verify(contextManager.getStateContext()).switchState(ClusterState.READ_ONLY);
}

@Test
void assertBuildEventWithInvalidClusterState() {
Optional<DispatchEvent> actual = builder.build(new DataChangedEvent("/states/cluster_state", "INVALID", Type.UPDATED));
assertTrue(actual.isPresent());
assertThat(((ClusterStateEvent) actual.get()).getClusterState(), is(ClusterState.OK));
handler.handle(contextManager, new DataChangedEvent("/states/cluster_state", "INVALID", Type.UPDATED));
verify(contextManager.getStateContext()).switchState(ClusterState.OK);
}
}

0 comments on commit fc57224

Please sign in to comment.