diff --git a/src/main/java/net/spy/memcached/ArcusReplNodeAddress.java b/src/main/java/net/spy/memcached/ArcusReplNodeAddress.java index d3f066583..bdb7eb800 100644 --- a/src/main/java/net/spy/memcached/ArcusReplNodeAddress.java +++ b/src/main/java/net/spy/memcached/ArcusReplNodeAddress.java @@ -109,8 +109,7 @@ static List getAddresses(String s) { return list; } - static Map> makeGroupAddrsList( - List addrs) { + static Map> makeGroupAddrs(List addrs) { Map> newAllGroups = new HashMap<>(); diff --git a/src/main/java/net/spy/memcached/CacheManager.java b/src/main/java/net/spy/memcached/CacheManager.java index cd0afea3d..e0747ff2d 100644 --- a/src/main/java/net/spy/memcached/CacheManager.java +++ b/src/main/java/net/spy/memcached/CacheManager.java @@ -656,7 +656,7 @@ private String getInfo() { /* ENABLE_REPLICATION if */ private List validateReplicaGroup(List socketList) { Map> newAllGroups = - ArcusReplNodeAddress.makeGroupAddrsList(socketList); + ArcusReplNodeAddress.makeGroupAddrs(socketList); // recreate socket list socketList.clear(); diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 6381df7dc..728c4c682 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -305,7 +305,7 @@ public void handleIO() throws IOException { /* ENABLE_REPLICATION if */ if (arcusReplEnabled) { // Deal with the memcached server group that need delayed switchover. - handleDelayedSwitchover(); + handleDelayedSwitchover(false); } /* ENABLE_REPLICATION end */ @@ -369,79 +369,42 @@ private void updateConnections(List addrs) throws IOException } /* ENABLE_REPLICATION if */ - private Set findChangedGroups(List addrs, - Collection nodes) { - Map addrMap = new HashMap<>(); - for (InetSocketAddress each : addrs) { - addrMap.put(each.toString(), each); - } - - Set changedGroupSet = new HashSet<>(); - for (MemcachedNode node : nodes) { - String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString(); - if (addrMap.remove(nodeAddr) == null) { // removed node - changedGroupSet.add(node.getReplicaGroup().getGroupName()); - } - } - for (String addr : addrMap.keySet()) { // newly added node - ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr); - changedGroupSet.add(a.getGroupName()); - } - return changedGroupSet; - } - - private List findAddrsOfChangedGroups(List addrs, - Set changedGroups) { - List changedGroupAddrs = new ArrayList<>(); - for (InetSocketAddress addr : addrs) { - if (changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName())) { - changedGroupAddrs.add(addr); - } - } - return changedGroupAddrs; - } - private void updateReplConnections(List addrs) throws IOException { List attachNodes = new ArrayList<>(); List removeNodes = new ArrayList<>(); List changeRoleGroups = new ArrayList<>(); List taskList = new ArrayList<>(); // tasks executed after locator update - /* In replication, after SWITCHOVER or REPL_SLAVE is received from a group - * and switchover is performed, but before the group's znode is changed, - * another group's znode can be changed. - * - * In this case, there is a problem that the switchover is restored - * because the state of the switchover group and the znode state are different. - * - * In order to remove the abnormal phenomenon, - * we find out the changed groups with the comparison of previous and current znode list, - * and update the state of groups based on them. - */ - Set changedGroups = findChangedGroups(addrs, locator.getAll()); + // Create new group list from the provided addresses + Map> newGroups = ArcusReplNodeAddress.makeGroupAddrs(addrs); + // Get the existing groups from the locator + Map oldGroups = + ((ArcusReplKetamaNodeLocator) locator).getAllGroups(); + Set invalidGroups = new HashSet<>(); - Map> newAllGroups = - ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups)); + // Cancel the previous delayed switchover case. + handleDelayedSwitchover(true); - // remove invalidated groups in changedGroups - for (Map.Entry> entry : newAllGroups.entrySet()) { + for (Map.Entry> entry : newGroups.entrySet()) { if (!ArcusReplNodeAddress.validateGroup(entry)) { - changedGroups.remove(entry.getKey()); + invalidGroups.add(entry.getKey()); + continue; + } + // Handle newly added groups + if (!oldGroups.containsKey(entry.getKey())) { + for (ArcusReplNodeAddress newAddr : entry.getValue()) { + attachNodes.add(attachMemcachedNode(newAddr)); + } } } - Map oldAllGroups = - ((ArcusReplKetamaNodeLocator) locator).getAllGroups(); - - for (String changedGroupName : changedGroups) { - MemcachedReplicaGroup oldGroup = oldAllGroups.get(changedGroupName); - List newGroupAddrs = newAllGroups.get(changedGroupName); + for (Map.Entry oldGroupEntry : oldGroups.entrySet()) { + String groupName = oldGroupEntry.getKey(); + MemcachedReplicaGroup oldGroup = oldGroupEntry.getValue(); + List newGroupAddrs = newGroups.get(groupName); - if (oldGroup == null) { - // Newly added group - for (ArcusReplNodeAddress newAddr : newGroupAddrs) { - attachNodes.add(attachMemcachedNode(newAddr)); - } + // If group name exists in old groups, invalid case is ignored. + if (invalidGroups.contains(groupName)) { continue; } @@ -453,11 +416,6 @@ private void updateReplConnections(List addrs) throws IOExcep continue; } - if (oldGroup.isDelayedSwitchover()) { - delayedSwitchoverGroups.remove(oldGroup); - switchoverMemcachedReplGroup(oldGroup, true); - } - MemcachedNode oldMasterNode = oldGroup.getMasterNode(); List oldSlaveNodes = oldGroup.getSlaveNodes(); @@ -473,6 +431,10 @@ private void updateReplConnections(List addrs) throws IOExcep Set newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs); if (oldMasterAddr.isSameAddress(newMasterAddr)) { + if (oldSlaveAddrs.equals(newSlaveAddrs)) { + // No change in the group. + continue; + } // add newly added slave node for (ArcusReplNodeAddress newSlaveAddr : newSlaveAddrs) { if (!oldSlaveAddrs.contains(newSlaveAddr)) { @@ -846,9 +808,9 @@ private void updateAlterConnections(List addrs) throws IOExce /* ENABLE_MIGRATION end */ // Handle the memcached server group that need delayed switchover. - private void handleDelayedSwitchover() { + private void handleDelayedSwitchover(boolean cancelNow) { if (!delayedSwitchoverGroups.isEmpty()) { - delayedSwitchoverGroups.switchover(); + delayedSwitchoverGroups.switchover(cancelNow); } } @@ -1793,7 +1755,7 @@ public long getMinDelayMillis() { 1); } - public void switchover() { + public void switchover(boolean cancelNow) { long now = System.nanoTime(); Iterator> iterator = groups.entrySet().iterator(); while (iterator.hasNext()) { @@ -1801,7 +1763,7 @@ public void switchover() { long switchoverTime = entry.getKey(); MemcachedReplicaGroup group = entry.getValue(); - if (now < switchoverTime) { + if (!cancelNow && now < switchoverTime) { return; } else { iterator.remove();