diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java index e029370ffb6bd..7902ee961e9e3 100644 --- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java +++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/version/MetaDataVersionPersistService.java @@ -20,9 +20,13 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion; import org.apache.shardingsphere.metadata.persist.node.NewDatabaseMetaDataNode; +import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware; +import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation; import org.apache.shardingsphere.mode.spi.PersistRepository; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; /** * Meta data version persist service. @@ -30,21 +34,47 @@ @RequiredArgsConstructor public final class MetaDataVersionPersistService implements MetaDataVersionBasedPersistService { - private static final String ACTIVE_VERSION = "active_version"; + private static final String ACTIVE_VERSION = "/active_version"; - private static final String VERSIONS = "versions"; + private static final String VERSIONS = "/versions/"; private final PersistRepository repository; - // TODO Need to use transaction operation @Override public void switchActiveVersion(final Collection metaDataVersions) { + if (repository instanceof NodePathTransactionAware) { + switchActiveVersionWithTransaction(metaDataVersions); + } else { + switchActiveVersionWithoutTransaction(metaDataVersions); + } + } + + private void switchActiveVersionWithTransaction(final Collection metaDataVersions) { + List nodePathTransactionOperations = buildNodePathTransactionOperations(metaDataVersions); + if (!nodePathTransactionOperations.isEmpty()) { + ((NodePathTransactionAware) repository).executeInTransaction(nodePathTransactionOperations); + } + } + + private List buildNodePathTransactionOperations(final Collection metaDataVersions) { + List result = new ArrayList<>(); + for (MetaDataVersion each : metaDataVersions) { + if (each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) { + continue; + } + result.add(NodePathTransactionOperation.update(each.getKey() + ACTIVE_VERSION, each.getNextActiveVersion())); + result.add(NodePathTransactionOperation.delete(each.getKey() + VERSIONS + each.getCurrentActiveVersion())); + } + return result; + } + + private void switchActiveVersionWithoutTransaction(final Collection metaDataVersions) { for (MetaDataVersion each : metaDataVersions) { if (each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) { continue; } - repository.persist(each.getKey() + "/" + ACTIVE_VERSION, each.getNextActiveVersion()); - repository.delete(String.join("/", each.getKey(), VERSIONS, each.getCurrentActiveVersion())); + repository.persist(each.getKey() + ACTIVE_VERSION, each.getNextActiveVersion()); + repository.delete(each.getKey() + VERSIONS + each.getCurrentActiveVersion()); } } diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java new file mode 100644 index 0000000000000..4b4931c60b3c0 --- /dev/null +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionAware.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.identifier; + +import java.util.List; + +/** + * Node path transaction aware. + */ +public interface NodePathTransactionAware { + + /** + * Execute operations in transaction. + * + * @param nodePathTransactionOperations node path transaction operations + */ + void executeInTransaction(List nodePathTransactionOperations); +} diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java new file mode 100644 index 0000000000000..dbe25f3840657 --- /dev/null +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/identifier/NodePathTransactionOperation.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.identifier; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Node path transaction operation. + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Getter +public final class NodePathTransactionOperation { + + public enum Type { + + ADD, + + UPDATE, + + DELETE + } + + private final Type type; + + private final String key; + + private final String value; + + /** + * Update. + * + * @param key key + * @param value value + * @return NodePathTransactionOperation node path transaction operation + */ + public static NodePathTransactionOperation update(final String key, final String value) { + return new NodePathTransactionOperation(Type.UPDATE, key, value); + } + + /** + * Delete. + * + * @param key key + * @return NodePathTransactionOperation node path transaction operation + */ + public static NodePathTransactionOperation delete(final String key) { + return new NodePathTransactionOperation(Type.DELETE, key, null); + } +} diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java index 44845d3f89a80..eeede26ef2c0a 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java @@ -23,6 +23,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory.Builder; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.TransactionOp; import org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; @@ -30,6 +32,7 @@ import org.apache.curator.utils.CloseableUtils; import org.apache.shardingsphere.infra.instance.InstanceContext; import org.apache.shardingsphere.infra.instance.InstanceContextAware; +import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException; @@ -37,6 +40,7 @@ import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; +import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties; @@ -52,13 +56,14 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * Registry repository of ZooKeeper. */ -public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware { +public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware, NodePathTransactionAware { private final Map caches = new ConcurrentHashMap<>(); @@ -288,6 +293,39 @@ public void setInstanceContext(final InstanceContext instanceContext) { client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceContext, this)); } + @Override + public void executeInTransaction(final List nodePathTransactionOperations) { + try { + client.transaction().forOperations(buildCuratorOps(nodePathTransactionOperations)); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + ZookeeperExceptionHandler.handleException(ex); + } + } + + private List buildCuratorOps(final List nodePathTransactionOperations) throws Exception { + List result = new ArrayList<>(nodePathTransactionOperations.size()); + TransactionOp transactionOp = client.transactionOp(); + for (NodePathTransactionOperation each : nodePathTransactionOperations) { + result.add(buildCuratorOp(each, transactionOp)); + } + return result; + } + + private CuratorOp buildCuratorOp(final NodePathTransactionOperation each, final TransactionOp transactionOp) throws Exception { + switch (each.getType()) { + case ADD: + return transactionOp.create().forPath(each.getKey(), each.getValue().getBytes(StandardCharsets.UTF_8)); + case UPDATE: + return transactionOp.setData().forPath(each.getKey(), each.getValue().getBytes(StandardCharsets.UTF_8)); + case DELETE: + return transactionOp.delete().forPath(each.getKey()); + default: + throw new UnsupportedOperationException(each.toString()); + } + } + @Override public String getType() { return "ZooKeeper";