Skip to content

Commit

Permalink
Add PersistCoordinatorFacade
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 29, 2024
1 parent 8ea3b5b commit 4013623
Show file tree
Hide file tree
Showing 22 changed files with 122 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.shardingsphere.mode.manager.standalone.workerid.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
Expand Down Expand Up @@ -75,6 +74,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new ModeConfiguration("Standalone", null), new EventBusContext());
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator(), mock(LockContext.class));
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class), mock(PersistCoordinatorFacade.class));
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public final class ContextManager implements AutoCloseable {

private final PersistCoordinatorFacade persistCoordinatorFacade;

public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext,
final PersistRepository repository, final PersistCoordinatorFacade persistCoordinatorFacade) {
public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository) {
this.metaDataContexts = new AtomicReference<>(metaDataContexts);
this.computeNodeInstanceContext = computeNodeInstanceContext;
metaDataContextManager = new MetaDataContextManager(this.metaDataContexts, computeNodeInstanceContext, repository);
Expand All @@ -78,7 +77,7 @@ public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNode
for (ContextManagerLifecycleListener each : ShardingSphereServiceLoader.getServiceInstances(ContextManagerLifecycleListener.class)) {
each.onInitialized(this);
}
this.persistCoordinatorFacade = persistCoordinatorFacade;
persistCoordinatorFacade = new PersistCoordinatorFacade(repository, computeNodeInstanceContext.getModeConfiguration());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

package org.apache.shardingsphere.mode.persist.coordinator;

import lombok.Getter;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
* Persist coordinator facade.
*/
public interface PersistCoordinatorFacade {
@Getter
public final class PersistCoordinatorFacade {

private final ProcessPersistCoordinator processPersistCoordinator;

/**
* Get process persist coordinator.
*
* @return process persist coordinator
*/
ProcessPersistCoordinator getProcessPersistCoordinator();
public PersistCoordinatorFacade(final PersistRepository repository, final ModeConfiguration modeConfig) {
PersistCoordinatorFacadeBuilder builder = TypedSPILoader.getService(PersistCoordinatorFacadeBuilder.class, modeConfig.getType());
processPersistCoordinator = builder.buildProcessPersistCoordinator(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
public interface PersistCoordinatorFacadeBuilder extends TypedSPI {

/**
* Build persist coordinator facade.
* Build process persist coordinator.
*
* @param repository persist repository
* @return persist coordinator facade
* @return built process persist coordinator
*/
PersistCoordinatorFacade build(PersistRepository repository);
ProcessPersistCoordinator buildProcessPersistCoordinator(PersistRepository repository);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.fixture;

import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacadeBuilder;
import org.apache.shardingsphere.mode.persist.coordinator.ProcessPersistCoordinator;
import org.apache.shardingsphere.mode.spi.PersistRepository;

public final class PersistCoordinatorFacadeBuilderFixture implements PersistCoordinatorFacadeBuilder {

@Override
public ProcessPersistCoordinator buildProcessPersistCoordinator(final PersistRepository repository) {
return null;
}

@Override
public Object getType() {
return "foo_type";
}

@Override
public boolean isDefault() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void setUp() throws SQLException {
when(metaDataContexts.getMetaData().getAllDatabases()).thenReturn(Collections.singleton(database));
when(computeNodeInstanceContext.getInstance()).thenReturn(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_id", 3307), Collections.emptyList()));
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
contextManager = new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class), null);
contextManager = new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}

private ShardingSphereDatabase mockDatabase() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.mode.fixture.PersistCoordinatorFacadeBuilderFixture
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacadeBuilder;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;

Expand All @@ -65,8 +63,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
computeNodeInstanceContext.init(new ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()), lockContext);
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, computeNodeInstanceContext);
PersistCoordinatorFacade persistCoordinatorFacade = TypedSPILoader.getService(PersistCoordinatorFacadeBuilder.class, "Cluster").build(repository);
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository, persistCoordinatorFacade);
ContextManager result = new ContextManager(metaDataContexts, computeNodeInstanceContext, repository);
registerOnline(computeNodeInstanceContext, param, result, repository);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.persist.coordinator;

import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacadeBuilder;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.persist.coordinator.ProcessPersistCoordinator;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
Expand All @@ -27,8 +27,8 @@
public final class ClusterPersistCoordinatorFacadeBuilder implements PersistCoordinatorFacadeBuilder {

@Override
public PersistCoordinatorFacade build(final PersistRepository repository) {
return new ClusterPersistCoordinatorFacade(repository);
public ProcessPersistCoordinator buildProcessPersistCoordinator(final PersistRepository repository) {
return new ClusterProcessPersistCoordinator(repository);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), param.getModeConfiguration(), eventBusContext);
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator(), new StandaloneLockContext());
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, computeNodeInstanceContext);
return new ContextManager(metaDataContexts, computeNodeInstanceContext, repository, null);
return new ContextManager(metaDataContexts, computeNodeInstanceContext, repository);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.persist.coordinator;
package org.apache.shardingsphere.mode.manager.standalone.persist.coordinator;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacadeBuilder;
import org.apache.shardingsphere.mode.persist.coordinator.ProcessPersistCoordinator;
import org.apache.shardingsphere.mode.spi.PersistRepository;

/**
* Cluster persist coordinator facade.
* Standalone persist coordinator facade builder.
*/
@RequiredArgsConstructor
public final class ClusterPersistCoordinatorFacade implements PersistCoordinatorFacade {
public final class StandalonePersistCoordinatorFacadeBuilder implements PersistCoordinatorFacadeBuilder {

private final PersistRepository repository;
@Override
public ProcessPersistCoordinator buildProcessPersistCoordinator(final PersistRepository repository) {
return null;
}

@Override
public ProcessPersistCoordinator getProcessPersistCoordinator() {
return new ClusterProcessPersistCoordinator(repository);
public Object getType() {
return "Standalone";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.mode.manager.standalone.persist.coordinator.StandalonePersistCoordinatorFacadeBuilder
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.datasource.JDBCBackendDataSource;
Expand Down Expand Up @@ -128,7 +127,7 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = mock(ComputeNodeInstanceContext.class);
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
return new ContextManager(
MetaDataContextsFactory.create(mock(MetaDataPersistService.class), metaData), computeNodeInstanceContext, mock(PersistRepository.class), mock(PersistCoordinatorFacade.class));
MetaDataContextsFactory.create(mock(MetaDataPersistService.class), metaData), computeNodeInstanceContext, mock(PersistRepository.class));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
Expand Down Expand Up @@ -109,8 +108,7 @@ void setUp() {
when(metaData.getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Arrays.asList(mock(SQLFederationRule.class), transactionRule)));
ComputeNodeInstanceContext computeNodeInstanceContext = mock(ComputeNodeInstanceContext.class);
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
ContextManager contextManager = new ContextManager(MetaDataContextsFactory.create(mock(MetaDataPersistService.class), metaData), computeNodeInstanceContext,
mock(PersistRepository.class), mock(PersistCoordinatorFacade.class));
ContextManager contextManager = new ContextManager(MetaDataContextsFactory.create(mock(MetaDataPersistService.class), metaData), computeNodeInstanceContext, mock(PersistRepository.class));
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.state.ClusterState;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.state.ClusterState;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -72,7 +71,7 @@ void restorePreviousContextManager() {
@Test
void assertInit() {
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(mock(MetaDataPersistService.class), new ShardingSphereMetaData());
ProxyContext.init(new ContextManager(metaDataContexts, mock(ComputeNodeInstanceContext.class, RETURNS_DEEP_STUBS), mock(PersistRepository.class), mock(PersistCoordinatorFacade.class)));
ProxyContext.init(new ContextManager(metaDataContexts, mock(ComputeNodeInstanceContext.class, RETURNS_DEEP_STUBS), mock(PersistRepository.class)));
assertThat(ProxyContext.getInstance().getContextManager().getStateContext(), is(ProxyContext.getInstance().getContextManager().getStateContext()));
assertThat(ProxyContext.getInstance().getContextManager().getStateContext().getState(), is(ClusterState.OK));
assertThat(ProxyContext.getInstance().getContextManager().getMetaDataContexts(), is(ProxyContext.getInstance().getContextManager().getMetaDataContexts()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,6 @@ private ContextManager mockContextManager() {
when(ProxyContext.getInstance().getContextManager().getDatabase("foo_db")).thenReturn(database);
ShardingSphereMetaData metaData = new ShardingSphereMetaData(Collections.singleton(database), mock(), mock(), new ConfigurationProperties(new Properties()));
ComputeNodeInstanceContext computeNodeInstanceContext = mock(ComputeNodeInstanceContext.class, RETURNS_DEEP_STUBS);
return new ContextManager(MetaDataContextsFactory.create(mock(), metaData), computeNodeInstanceContext, mock(), mock());
return new ContextManager(MetaDataContextsFactory.create(mock(), metaData), computeNodeInstanceContext, mock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.persist.coordinator.PersistCoordinatorFacade;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
Expand Down Expand Up @@ -77,7 +76,7 @@ void assertExecuteWithUnknownDatabase() {
when(connectionSession.getUsedDatabaseName()).thenReturn("unknown");
ComputeNodeInstanceContext computeNodeInstanceContext = mock(ComputeNodeInstanceContext.class);
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
ContextManager contextManager = new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class), mock(PersistCoordinatorFacade.class));
ContextManager contextManager = new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
assertThrows(UnknownDatabaseException.class, () -> new DistSQLQueryBackendHandler(mock(ExportDatabaseConfigurationStatement.class), connectionSession).execute());
}
Expand Down
Loading

0 comments on commit 4013623

Please sign in to comment.