Skip to content

Commit

Permalink
Merge pull request #760 from ctripcorp/bugfix/cluster-type-change
Browse files Browse the repository at this point in the history
no recreating cluster after clusterType changed
  • Loading branch information
LanternLee authored Dec 25, 2023
2 parents 352386f + 6a176e6 commit eb71fa5
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void testBackupKeeperTerminateRefullsyncAndPartialLater() throws Exceptio
fakeRedisServer.setCommandsLength(allCommandsSize * 2);
fakeRedisServer.reGenerateRdb();
waitCmdNotContinueWithRdb(keeperServer1);
waitConditionUntilTimeOut(() -> keeperServer1.getKeeperRepl().getBeginOffset() > keeperServer2.getKeeperRepl().getEndOffset() + 1);

KeeperStats keeperStats = keeperServer1.getKeeperMonitor().getKeeperStats();
long originFsyncCnt = keeperStats.getFullSyncCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package com.ctrip.xpipe.redis.meta.server.crdt.master;

import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;

import java.util.Set;

public interface MasterChooseCommandFactory {

RedundantMasterClearCommand buildRedundantMasterClearCommand(Long clusterDbId, Long shardDbId, Set<String> dcs);

MasterChooseCommand buildPeerMasterChooserCommand(String dcId, Long clusterDbId, Long shardDbId);

MasterChooseCommand buildCurrentMasterChooserCommand(Long clusterDbId, Long shardDbId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.ctrip.xpipe.redis.meta.server.crdt.master.command;

import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;

/**
* @author lishanglin
* date 2023/12/22
*/
public class RedundantMasterClearCommand extends AbstractCommand<Set<String>> {

private Long clusterId;

private Long shardId;

private Set<String> dcs;

private CurrentMetaManager currentMetaManager;

private static Logger logger = LoggerFactory.getLogger(RedundantMasterClearCommand.class);

public RedundantMasterClearCommand(Long clusterId, Long shardId, Set<String> dcs, CurrentMetaManager currentMetaManager) {
this.clusterId = clusterId;
this.shardId = shardId;
this.dcs = dcs;
this.currentMetaManager = currentMetaManager;
}

@Override
protected void doExecute() throws Throwable {
Set<String> currentDcs = currentMetaManager.getUpstreamPeerDcs(clusterId, shardId);
Set<String> redundantDcs = new HashSet<>(currentDcs);
redundantDcs.removeAll(dcs);

for (String dc: redundantDcs) {
logger.info("[cluster_{},shard_{}] remove dc {}", clusterId, shardId, dc);
currentMetaManager.removePeerMaster(dc, clusterId, shardId);
}

future().setSuccess(redundantDcs);
}

@Override
protected void doReset() {
// do nothing
}

@Override
public String getName() {
return String.format("%s[cluster_%d,shard_%d]", getClass().getSimpleName(), clusterId, shardId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommandFactory;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.CurrentMasterChooseCommand;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.PeerMasterChooseCommand;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.redis.meta.server.multidc.MultiDcService;
Expand All @@ -17,6 +18,7 @@
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -52,6 +54,11 @@ public DefaultMasterChooseCommandFactory(DcMetaCache dcMetaCache, CurrentMetaMan
scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("PeerMasterChooseCommandSchedule"));
}

@Override
public RedundantMasterClearCommand buildRedundantMasterClearCommand(Long clusterDbId, Long shardDbId, Set<String> dcs) {
return new RedundantMasterClearCommand(clusterDbId, shardDbId, dcs, currentMetaManager);
}

@Override
public MasterChooseCommand buildPeerMasterChooserCommand(String dcId, Long clusterDbId, Long shardDbId) {
MasterChooseCommand masterChooseCommand = new PeerMasterChooseCommand(dcId, clusterDbId, shardDbId, multiDcService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
import com.ctrip.xpipe.concurrent.KeyedOneThreadTaskExecutor;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommandFactory;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;
import com.ctrip.xpipe.redis.meta.server.keeper.keepermaster.MasterChooser;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.tuple.Pair;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;

public class PeerMasterChooser extends CurrentMasterChooser implements MasterChooser {
Expand Down Expand Up @@ -37,11 +42,14 @@ protected void work() {
ParallelCommandChain parallelCommandChain = new ParallelCommandChain(executors);
String currentDc = dcMetaCache.getCurrentDc();

for (String dcId : clusterMeta.getDcs().split("\\s*,\\s*")) {
String[] rawDcs = clusterMeta.getDcs().split("\\s*,\\s*");
Set<String> dcs = new HashSet<>(Arrays.asList(rawDcs));
masterChooseCommandFactory.buildRedundantMasterClearCommand(clusterDbId, shardDbId, dcs).execute();

for (String dcId : dcs) {
if (currentDc.equalsIgnoreCase(dcId)) continue;
parallelCommandChain.add(masterChooseCommandFactory.buildPeerMasterChooserCommand(dcId, clusterDbId, shardDbId));
}

peerMasterChooseExecutor.execute(Pair.of(clusterDbId, shardDbId), parallelCommandChain);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.command.CommandFutureListener;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.lifecycle.TopElement;
import com.ctrip.xpipe.api.monitor.EventMonitor;
import com.ctrip.xpipe.api.pool.SimpleKeyedObjectPool;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.cluster.Hints;
Expand Down Expand Up @@ -81,6 +82,8 @@ public class DefaultKeeperManager extends AbstractCurrentMetaObserver implements

private ExecutorService executors;

private static final String KEEPER_MISMATCH_EVENT = "keeperMismatch";

@Override
protected void doInitialize() throws Exception {
super.doInitialize();
Expand Down Expand Up @@ -193,11 +196,13 @@ protected void doCheckShard(ClusterMeta clusterMeta, ShardMeta shardMeta) {

if (deadKeepers.size() > 0) {
logger.info("[doCheck][dead keepers]{}", deadKeepers);
EventMonitor.DEFAULT.logEvent(KEEPER_MISMATCH_EVENT, String.format("[dead]cluster_%d,shard_%d", clusterDbId, shardDbId));
addDeadKeepers(deadKeepers, clusterDbId, shardDbId);
}

if (deadKeepers.size() == 0 && removedKeepers.size() > 0) {
logger.info("[doCheck][removed keepers]{}", removedKeepers);
EventMonitor.DEFAULT.logEvent(KEEPER_MISMATCH_EVENT, String.format("[redundant]cluster_%d,shard_%d", clusterDbId, shardDbId));
removeRemovedKeepers(removedKeepers, clusterDbId, shardDbId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,6 @@ private void clusterRoutesChange(Long clusterDbId) {
refreshApplierMaster(clusterMeta);
}
}

private boolean isReCreateCluster(ClusterMeta current, ClusterMeta future) {
//cluster type changed should recreate CurrentClusterMeta
return !ObjectUtils.equals(current.getType(), future.getType());
}

private boolean needUpdateClusterRoutesWhenClusterChange(ClusterMeta current, ClusterMeta future) {
ClusterType clusterType = ClusterType.lookup(future.getType());
Expand Down Expand Up @@ -230,16 +225,11 @@ private void handleClusterChanged(ClusterMetaComparator clusterMetaComparator) {
if(currentMeta.hasCluster(clusterDbId)){
ClusterMeta current = clusterMetaComparator.getCurrent();
ClusterMeta future = clusterMetaComparator.getFuture();
if(isReCreateCluster(current, future)) {
destroyCluster(current);
addCluster(clusterDbId);
} else {
currentMeta.changeCluster(clusterMetaComparator);
if(needUpdateClusterRoutesWhenClusterChange(current, future)) {
clusterRoutesChange(clusterDbId);
}
notifyObservers(clusterMetaComparator);
currentMeta.changeCluster(clusterMetaComparator);
if(needUpdateClusterRoutesWhenClusterChange(current, future)) {
clusterRoutesChange(clusterDbId);
}
notifyObservers(clusterMetaComparator);

}else{
logger.warn("[handleClusterChanged][but we do not has it]{}", clusterMetaComparator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.ctrip.xpipe.redis.meta.server.crdt.PeerMasterMetaServerStateChangeHandlerTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.CurrentMasterChooseCommandTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.PeerMasterChooseCommandTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommandTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.impl.*;
import com.ctrip.xpipe.redis.meta.server.crdt.replication.impl.DefaultPeerMasterStateAdjusterTest;
import com.ctrip.xpipe.redis.meta.server.crdt.replication.impl.DefaultPeerMasterStateManagerTest;
Expand Down Expand Up @@ -105,6 +106,7 @@
MasterChooserTest.class,
CurrentMasterChooseCommandTest.class,
PeerMasterChooseCommandTest.class,
RedundantMasterClearCommandTest.class,
DefaultPeerMasterChooseActionTest.class,
MasterChooseCommandFactoryTest.class,
PeerMasterAdjustJobTest.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.ctrip.xpipe.redis.meta.server.crdt.master.command;

import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.stream.Collectors;

import static org.mockito.Mockito.*;

/**
* @author lishanglin
* date 2023/12/22
*/
@RunWith(MockitoJUnitRunner.class)
public class RedundantMasterClearCommandTest extends AbstractMetaServerTest {

@Mock
private CurrentMetaManager currentMetaManager;

@Before
public void setupRedundantMasterClearCommandTest() {
when(currentMetaManager.getUpstreamPeerDcs(getClusterDbId(), getShardDbId()))
.thenReturn(new HashSet<>(Arrays.asList("jq", "oy")));
}

@Test
public void testClearRedundant() throws Exception {
RedundantMasterClearCommand command = new RedundantMasterClearCommand(getClusterDbId(), getShardDbId(),
new HashSet<>(Arrays.asList("jq", "rb")), currentMetaManager);
Assert.assertEquals(Collections.singleton("oy"), command.execute().get());
verify(currentMetaManager, times(1)).removePeerMaster("oy", getClusterDbId(), getShardDbId());
}

@Test
public void testNoRedundant() throws Exception {
RedundantMasterClearCommand command = new RedundantMasterClearCommand(getClusterDbId(), getShardDbId(),
new HashSet<>(Arrays.asList("jq", "oy")), currentMetaManager);
Assert.assertEquals(Collections.emptySet(), command.execute().get());
verify(currentMetaManager, never()).removePeerMaster(anyString(), any(), any());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommand;
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommandFactory;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.tuple.Pair;
Expand Down Expand Up @@ -43,6 +44,9 @@ public class MasterChooserTest extends AbstractMetaServerTest {
@Mock
private MasterChooseCommand command;

@Mock
private RedundantMasterClearCommand redundantMasterClearCommand;

@Mock
private KeyedOneThreadTaskExecutor<Pair<Long, Long> > keyedOneThreadTaskExecutor;

Expand All @@ -64,6 +68,7 @@ public void setupDefaultPeerMasterChooserTest() throws Exception {
@Test
public void testPeerMasterChooseWork() {
Mockito.when(factory.buildPeerMasterChooserCommand(Mockito.anyString(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(command);
Mockito.when(factory.buildRedundantMasterClearCommand(Mockito.anyLong(), Mockito.anyLong(), Mockito.anySet())).thenReturn(redundantMasterClearCommand);
Mockito.doAnswer(invocation -> {
Pair<String, String> key = invocation.getArgument(0, Pair.class);
ParallelCommandChain commandChain = invocation.getArgument(1, ParallelCommandChain.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,72 +405,6 @@ public void testUpdateBiDirectionClusterMeta() {
verify(currentMeta, times(1)).addCluster(any());
}

@Test
public void testChangeClusterType() {
currentMetaServerMetaManager = spy(new DefaultCurrentMetaManager());
currentMetaServerMetaManager.setSlotManager(slotManager);
currentMetaServerMetaManager.setDcMetaCache(dcMetaCache);
currentMetaServerMetaManager.setCurrentMeta(currentMeta);
currentMetaServerMetaManager.setCurrentClusterServer(currentClusterServer);
currentMetaServerMetaManager.addObserver(observer);
String clusterName = "cluster1";
Long clusterDbId = 1L;
Long shardDbId = 1L;
Mockito.when(currentClusterServer.hasKey(clusterDbId)).thenReturn(true);

DcMeta currentDcMeta = new DcMeta().setId("jq");
ClusterMeta currentClusterMeta = new ClusterMeta().setType(ClusterType.ONE_WAY.name()).setId(clusterName).setActiveDc("oy").setDbId(clusterDbId);
ShardMeta currentShardMeta = new ShardMeta().setId("cluster1_1").setDbId(shardDbId);
RedisMeta currentMaster = new RedisMeta().setIp("127.0.0.1").setPort(6379);
RedisMeta currentSlave = new RedisMeta().setIp("127.0.0.1").setPort(6380).setMaster("127.0.0.1:6379");
KeeperMeta currentActive = new KeeperMeta().setIp("127.0.0.1").setPort(16379).setActive(true);
KeeperMeta currentKeeper= new KeeperMeta().setIp("127.0.0.1").setPort(16380);
currentShardMeta.addRedis(currentMaster).addRedis(currentSlave).addKeeper(currentActive).addKeeper(currentKeeper);
currentClusterMeta.addShard(currentShardMeta);
currentDcMeta.addCluster(currentClusterMeta);


Mockito.when(dcMetaCache.getClusterMeta(clusterDbId)).thenReturn(currentClusterMeta);

//init
currentMetaServerMetaManager.update(DcMetaComparator.buildClusterChanged(null, currentClusterMeta), null);
doAnswer(invocation -> {
Object node = invocation.getArgument(0, Object.class);
Assert.assertTrue(node instanceof NodeAdded);
return null;
}).when(observer).update(any(), any());
verify(currentMeta, times(1)).addCluster(currentClusterMeta);
verify(observer, times(1)).update(any(), any());

DcMeta futureDcMeta = new DcMeta().setId("jq").addRoute(new RouteMeta().setId(1L));
ClusterMeta futureClusterMeta = new ClusterMeta().setType(ClusterType.BI_DIRECTION.name()).setId(clusterName).setDbId(clusterDbId).setDcs("jq,oy,fq");
ShardMeta futureShardMeta = new ShardMeta().setId("cluster1_1").setDbId(shardDbId);
RedisMeta futureMaster = new RedisMeta().setIp("127.0.0.1").setPort(6379).setGid(1L);
RedisMeta futureSlave = new RedisMeta().setIp("127.0.0.1").setPort(6380).setGid(1L).setMaster("127.0.0.1:6379");
futureShardMeta.addRedis(futureMaster).addRedis(futureSlave);
currentClusterMeta.addShard(futureShardMeta);
futureDcMeta.addCluster(futureClusterMeta);

DcMetaComparator dcMetaComparator = new DcMetaComparator(currentDcMeta, futureDcMeta);
dcMetaComparator.compare();
AtomicBoolean exist = new AtomicBoolean(true);
doAnswer(inv -> {
exist.set(false);
return null;
}).when(currentMeta).removeCluster(clusterDbId);
doAnswer(inv -> exist.get()).when(currentMeta).hasCluster(clusterDbId);
doAnswer(inv -> {
exist.set(true);
return null;
}).when(currentMeta).addCluster(any());

currentMetaServerMetaManager.update(dcMetaComparator, null);
//remove
verify(currentMeta, times(1)).removeCluster(clusterDbId);
//add
verify(currentMeta, times(2)).addCluster(currentClusterMeta);
}

@Test
public void testOneWayClusterDesignatedRouteIdsChanged() {
currentMetaServerMetaManager = spy(new DefaultCurrentMetaManager());
Expand Down

0 comments on commit eb71fa5

Please sign in to comment.