Skip to content

Commit

Permalink
Switch meta data active version in transaction (#30167)
Browse files Browse the repository at this point in the history
* Switch meta data active version in transaction

* Fix checkstyle

* Update delete node path

* Enhance logic

* Fix build op list
  • Loading branch information
zhaojinchao95 authored Feb 18, 2024
1 parent cb931d4 commit 33ec6a8
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,61 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
import org.apache.shardingsphere.metadata.persist.node.NewDatabaseMetaDataNode;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
import org.apache.shardingsphere.mode.spi.PersistRepository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* Meta data version persist service.
*/
@RequiredArgsConstructor
public final class MetaDataVersionPersistService implements MetaDataVersionBasedPersistService {

private static final String ACTIVE_VERSION = "active_version";
private static final String ACTIVE_VERSION = "/active_version";

private static final String VERSIONS = "versions";
private static final String VERSIONS = "/versions/";

private final PersistRepository repository;

// TODO Need to use transaction operation
@Override
public void switchActiveVersion(final Collection<MetaDataVersion> metaDataVersions) {
if (repository instanceof NodePathTransactionAware) {
switchActiveVersionWithTransaction(metaDataVersions);
} else {
switchActiveVersionWithoutTransaction(metaDataVersions);
}
}

private void switchActiveVersionWithTransaction(final Collection<MetaDataVersion> metaDataVersions) {
List<NodePathTransactionOperation> nodePathTransactionOperations = buildNodePathTransactionOperations(metaDataVersions);
if (!nodePathTransactionOperations.isEmpty()) {
((NodePathTransactionAware) repository).executeInTransaction(nodePathTransactionOperations);
}
}

private List<NodePathTransactionOperation> buildNodePathTransactionOperations(final Collection<MetaDataVersion> metaDataVersions) {
List<NodePathTransactionOperation> result = new ArrayList<>();
for (MetaDataVersion each : metaDataVersions) {
if (each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
continue;
}
result.add(NodePathTransactionOperation.update(each.getKey() + ACTIVE_VERSION, each.getNextActiveVersion()));
result.add(NodePathTransactionOperation.delete(each.getKey() + VERSIONS + each.getCurrentActiveVersion()));
}
return result;
}

private void switchActiveVersionWithoutTransaction(final Collection<MetaDataVersion> metaDataVersions) {
for (MetaDataVersion each : metaDataVersions) {
if (each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
continue;
}
repository.persist(each.getKey() + "/" + ACTIVE_VERSION, each.getNextActiveVersion());
repository.delete(String.join("/", each.getKey(), VERSIONS, each.getCurrentActiveVersion()));
repository.persist(each.getKey() + ACTIVE_VERSION, each.getNextActiveVersion());
repository.delete(each.getKey() + VERSIONS + each.getCurrentActiveVersion());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.identifier;

import java.util.List;

/**
* Node path transaction aware.
*/
public interface NodePathTransactionAware {

/**
* Execute operations in transaction.
*
* @param nodePathTransactionOperations node path transaction operations
*/
void executeInTransaction(List<NodePathTransactionOperation> nodePathTransactionOperations);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.identifier;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Node path transaction operation.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
public final class NodePathTransactionOperation {

public enum Type {

ADD,

UPDATE,

DELETE
}

private final Type type;

private final String key;

private final String value;

/**
* Update.
*
* @param key key
* @param value value
* @return NodePathTransactionOperation node path transaction operation
*/
public static NodePathTransactionOperation update(final String key, final String value) {
return new NodePathTransactionOperation(Type.UPDATE, key, value);
}

/**
* Delete.
*
* @param key key
* @return NodePathTransactionOperation node path transaction operation
*/
public static NodePathTransactionOperation delete(final String key) {
return new NodePathTransactionOperation(Type.DELETE, key, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,24 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
Expand All @@ -52,13 +56,14 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Registry repository of ZooKeeper.
*/
public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware {
public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware, NodePathTransactionAware {

private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -288,6 +293,39 @@ public void setInstanceContext(final InstanceContext instanceContext) {
client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceContext, this));
}

@Override
public void executeInTransaction(final List<NodePathTransactionOperation> nodePathTransactionOperations) {
try {
client.transaction().forOperations(buildCuratorOps(nodePathTransactionOperations));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
ZookeeperExceptionHandler.handleException(ex);
}
}

private List<CuratorOp> buildCuratorOps(final List<NodePathTransactionOperation> nodePathTransactionOperations) throws Exception {
List<CuratorOp> result = new ArrayList<>(nodePathTransactionOperations.size());
TransactionOp transactionOp = client.transactionOp();
for (NodePathTransactionOperation each : nodePathTransactionOperations) {
result.add(buildCuratorOp(each, transactionOp));
}
return result;
}

private CuratorOp buildCuratorOp(final NodePathTransactionOperation each, final TransactionOp transactionOp) throws Exception {
switch (each.getType()) {
case ADD:
return transactionOp.create().forPath(each.getKey(), each.getValue().getBytes(StandardCharsets.UTF_8));
case UPDATE:
return transactionOp.setData().forPath(each.getKey(), each.getValue().getBytes(StandardCharsets.UTF_8));
case DELETE:
return transactionOp.delete().forPath(each.getKey());
default:
throw new UnsupportedOperationException(each.toString());
}
}

@Override
public String getType() {
return "ZooKeeper";
Expand Down

0 comments on commit 33ec6a8

Please sign in to comment.