Skip to content

Commit

Permalink
INTERNAL: Refactoring updateReplConnections.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Feb 24, 2025
1 parent 8c9a6e3 commit d06980d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 73 deletions.
3 changes: 1 addition & 2 deletions src/main/java/net/spy/memcached/ArcusReplNodeAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ static List<InetSocketAddress> getAddresses(String s) {
return list;
}

static Map<String, List<ArcusReplNodeAddress>> makeGroupAddrsList(
List<InetSocketAddress> addrs) {
static Map<String, List<ArcusReplNodeAddress>> makeGroupAddrs(List<InetSocketAddress> addrs) {

Map<String, List<ArcusReplNodeAddress>> newAllGroups =
new HashMap<>();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/spy/memcached/CacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ private String getInfo() {
/* ENABLE_REPLICATION if */
private List<InetSocketAddress> validateReplicaGroup(List<InetSocketAddress> socketList) {
Map<String, List<ArcusReplNodeAddress>> newAllGroups =
ArcusReplNodeAddress.makeGroupAddrsList(socketList);
ArcusReplNodeAddress.makeGroupAddrs(socketList);

// recreate socket list
socketList.clear();
Expand Down
102 changes: 32 additions & 70 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void handleIO() throws IOException {
/* ENABLE_REPLICATION if */
if (arcusReplEnabled) {
// Deal with the memcached server group that need delayed switchover.
handleDelayedSwitchover();
handleDelayedSwitchover(false);
}
/* ENABLE_REPLICATION end */

Expand Down Expand Up @@ -369,79 +369,42 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
}

/* ENABLE_REPLICATION if */
private Set<String> findChangedGroups(List<InetSocketAddress> addrs,
Collection<MemcachedNode> nodes) {
Map<String, InetSocketAddress> addrMap = new HashMap<>();
for (InetSocketAddress each : addrs) {
addrMap.put(each.toString(), each);
}

Set<String> changedGroupSet = new HashSet<>();
for (MemcachedNode node : nodes) {
String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString();
if (addrMap.remove(nodeAddr) == null) { // removed node
changedGroupSet.add(node.getReplicaGroup().getGroupName());
}
}
for (String addr : addrMap.keySet()) { // newly added node
ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr);
changedGroupSet.add(a.getGroupName());
}
return changedGroupSet;
}

private List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
Set<String> changedGroups) {
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
for (InetSocketAddress addr : addrs) {
if (changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName())) {
changedGroupAddrs.add(addr);
}
}
return changedGroupAddrs;
}

private void updateReplConnections(List<InetSocketAddress> addrs) throws IOException {
List<MemcachedNode> attachNodes = new ArrayList<>();
List<MemcachedNode> removeNodes = new ArrayList<>();
List<MemcachedReplicaGroup> changeRoleGroups = new ArrayList<>();
List<Task> taskList = new ArrayList<>(); // tasks executed after locator update

/* In replication, after SWITCHOVER or REPL_SLAVE is received from a group
* and switchover is performed, but before the group's znode is changed,
* another group's znode can be changed.
*
* In this case, there is a problem that the switchover is restored
* because the state of the switchover group and the znode state are different.
*
* In order to remove the abnormal phenomenon,
* we find out the changed groups with the comparison of previous and current znode list,
* and update the state of groups based on them.
*/
Set<String> changedGroups = findChangedGroups(addrs, locator.getAll());
// Create new group list from the provided addresses
Map<String, List<ArcusReplNodeAddress>> newGroups = ArcusReplNodeAddress.makeGroupAddrs(addrs);
// Get the existing groups from the locator
Map<String, MemcachedReplicaGroup> oldGroups =
((ArcusReplKetamaNodeLocator) locator).getAllGroups();
Set<String> invalidGroups = new HashSet<>();

Map<String, List<ArcusReplNodeAddress>> newAllGroups =
ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups));
// Cancel the previous delayed switchover case.
handleDelayedSwitchover(true);

// remove invalidated groups in changedGroups
for (Map.Entry<String, List<ArcusReplNodeAddress>> entry : newAllGroups.entrySet()) {
for (Map.Entry<String, List<ArcusReplNodeAddress>> entry : newGroups.entrySet()) {
if (!ArcusReplNodeAddress.validateGroup(entry)) {
changedGroups.remove(entry.getKey());
invalidGroups.add(entry.getKey());
continue;
}
// Handle newly added groups
if (!oldGroups.containsKey(entry.getKey())) {
for (ArcusReplNodeAddress newAddr : entry.getValue()) {
attachNodes.add(attachMemcachedNode(newAddr));
}
}
}

Map<String, MemcachedReplicaGroup> oldAllGroups =
((ArcusReplKetamaNodeLocator) locator).getAllGroups();

for (String changedGroupName : changedGroups) {
MemcachedReplicaGroup oldGroup = oldAllGroups.get(changedGroupName);
List<ArcusReplNodeAddress> newGroupAddrs = newAllGroups.get(changedGroupName);
for (Map.Entry<String, MemcachedReplicaGroup> oldGroupEntry : oldGroups.entrySet()) {
String groupName = oldGroupEntry.getKey();
MemcachedReplicaGroup oldGroup = oldGroupEntry.getValue();
List<ArcusReplNodeAddress> newGroupAddrs = newGroups.get(groupName);

if (oldGroup == null) {
// Newly added group
for (ArcusReplNodeAddress newAddr : newGroupAddrs) {
attachNodes.add(attachMemcachedNode(newAddr));
}
// If group name exists in old groups, invalid case is ignored.
if (invalidGroups.contains(groupName)) {
continue;
}

Expand All @@ -453,11 +416,6 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
continue;
}

if (oldGroup.isDelayedSwitchover()) {
delayedSwitchoverGroups.remove(oldGroup);
switchoverMemcachedReplGroup(oldGroup, true);
}

MemcachedNode oldMasterNode = oldGroup.getMasterNode();
List<MemcachedNode> oldSlaveNodes = oldGroup.getSlaveNodes();

Expand All @@ -473,6 +431,10 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
Set<ArcusReplNodeAddress> newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs);

if (oldMasterAddr.isSameAddress(newMasterAddr)) {
if (oldSlaveAddrs.equals(newSlaveAddrs)) {
// No change in the group.
continue;
}
// add newly added slave node
for (ArcusReplNodeAddress newSlaveAddr : newSlaveAddrs) {
if (!oldSlaveAddrs.contains(newSlaveAddr)) {
Expand Down Expand Up @@ -846,9 +808,9 @@ private void updateAlterConnections(List<InetSocketAddress> addrs) throws IOExce
/* ENABLE_MIGRATION end */

// Handle the memcached server group that need delayed switchover.
private void handleDelayedSwitchover() {
private void handleDelayedSwitchover(boolean cancelNow) {
if (!delayedSwitchoverGroups.isEmpty()) {
delayedSwitchoverGroups.switchover();
delayedSwitchoverGroups.switchover(cancelNow);
}
}

Expand Down Expand Up @@ -1793,15 +1755,15 @@ public long getMinDelayMillis() {
1);
}

public void switchover() {
public void switchover(boolean cancelNow) {
long now = System.nanoTime();
Iterator<Entry<Long, MemcachedReplicaGroup>> iterator = groups.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, MemcachedReplicaGroup> entry = iterator.next();
long switchoverTime = entry.getKey();
MemcachedReplicaGroup group = entry.getValue();

if (now < switchoverTime) {
if (!cancelNow && now < switchoverTime) {
return;
} else {
iterator.remove();
Expand Down

0 comments on commit d06980d

Please sign in to comment.