From e4eb9f02ae86ba35f5fc923d16a9c21b61e32264 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 4 Sep 2023 05:23:54 +0530 Subject: [PATCH 01/18] PrimaryShardAllocator refactor to abstract out shard state and method calls Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../gateway/BaseGatewayShardAllocator.java | 87 +++++++++- .../gateway/PrimaryShardAllocator.java | 157 +++++++++++------- 3 files changed, 178 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80721b0f1b3e2..ef9de19f9b04b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -152,6 +152,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) - Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177)) - Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412)) +- PrimaryShardAllocator refactor to abstract out shard state and method calls ([#9760](https://github.com/opensearch-project/OpenSearch/pull/9760))) ### Deprecated diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 59ef894958cbe..1b943db825b4f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -34,8 +34,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; @@ -43,14 +45,22 @@ import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; /** * An abstract class that implements basic functionality for allocating * shards to nodes based on shard copies that already exist in the cluster. - * + *

* Individual implementations of this class are responsible for providing * the logic to determine to which nodes (if any) those shards are allocated. * @@ -64,8 +74,9 @@ public abstract class BaseGatewayShardAllocator { * Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist. * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)} * to make decisions on assigning shards to nodes. - * @param shardRouting the shard to allocate - * @param allocation the allocation state container object + * + * @param shardRouting the shard to allocate + * @param allocation the allocation state container object * @param unassignedAllocationHandler handles the allocation of the current shard */ public void allocateUnassigned( @@ -109,9 +120,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions * about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated. * - * @param unassignedShard the unassigned shard to allocate - * @param allocation the current routing state - * @param logger the logger + * @param unassignedShard the unassigned shard to allocate + * @param allocation the current routing state + * @param logger the logger * @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision */ public abstract AllocateUnassignedDecision makeAllocationDecision( @@ -132,4 +143,68 @@ protected static List buildDecisionsForAllNodes(ShardRouti } return results; } + + protected static class NodeShardState { + private final String allocationId; + private final boolean primary; + private final Exception storeException; + private final ReplicationCheckpoint replicationCheckpoint; + private final DiscoveryNode node; + + public NodeShardState(DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException) { + this.node = node; + this.allocationId = allocationId; + this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; + this.storeException = storeException; + } + + public String allocationId() { + return this.allocationId; + } + + public boolean primary() { + return this.primary; + } + + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + + public Exception storeException() { + return this.storeException; + } + + public DiscoveryNode getNode() { + return this.node; + } + } + + protected static class NodeShardStates { + TreeMap nodeShardStates; + + public NodeShardStates(Comparator comparator) { + this.nodeShardStates = new TreeMap<>(comparator); + } + + public void add(NodeShardState key, DiscoveryNode value) { + this.nodeShardStates.put(key, value); + } + + public DiscoveryNode get(NodeShardState key) { + return this.nodeShardStates.get(key); + } + + public int size() { + return this.nodeShardStates.size(); + } + + public Iterator iterator() { + return this.nodeShardStates.keySet().iterator(); + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 4dc9396751fc9..7a8472a4fd28b 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -81,7 +82,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { /** * Is the allocator responsible for allocating the given {@link ShardRouting}? */ - private static boolean isResponsibleFor(final ShardRouting shard) { + protected static boolean isResponsibleFor(final ShardRouting shard) { return shard.primary() // must be primary && shard.unassigned() // must be unassigned // only handle either an existing store or a snapshot recovery @@ -89,19 +90,20 @@ private static boolean isResponsibleFor(final ShardRouting shard) { || shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT); } - @Override - public AllocateUnassignedDecision makeAllocationDecision( - final ShardRouting unassignedShard, - final RoutingAllocation allocation, - final Logger logger - ) { + /** + * Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is + * not responsible for this particular shard. + * + * @param unassignedShard unassigned shard routing + * @param allocation routing allocation object + * @return allocation decision taken for this shard + */ + protected AllocateUnassignedDecision getInEligibleShardDecision(ShardRouting unassignedShard, RoutingAllocation allocation) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for allocating this shard return AllocateUnassignedDecision.NOT_TAKEN; } - final boolean explain = allocation.debugDecision(); - if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) { List nodeDecisions = null; @@ -110,17 +112,43 @@ public AllocateUnassignedDecision makeAllocationDecision( } return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } + return null; + } + @Override + public AllocateUnassignedDecision makeAllocationDecision( + final ShardRouting unassignedShard, + final RoutingAllocation allocation, + final Logger logger + ) { + AllocateUnassignedDecision decision = getInEligibleShardDecision(unassignedShard, allocation); + if (decision != null) { + return decision; + } final FetchResult shardState = fetchData(unassignedShard, allocation); if (shardState.hasData() == false) { allocation.setHasPendingAsyncFetch(); List nodeDecisions = null; - if (explain) { + if (allocation.debugDecision()) { nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } + NodeShardStates nodeShardStates = getNodeShardStates(shardState); + return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); + } + private static NodeShardStates getNodeShardStates(FetchResult shardsState) { + NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1); + shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { + nodeShardStates.add(new NodeShardState(node, nodeGatewayStartedShard.allocationId(), nodeGatewayStartedShard.primary(), nodeGatewayStartedShard.replicationCheckpoint(), nodeGatewayStartedShard.storeException()), node); + }); + return nodeShardStates; + } + + protected AllocateUnassignedDecision getAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, + NodeShardStates shardState, Logger logger) { + final boolean explain = allocation.debugDecision(); // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); @@ -185,22 +213,23 @@ public AllocateUnassignedDecision makeAllocationDecision( boolean throttled = false; if (nodesToAllocate.yesNodeShards.isEmpty() == false) { DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); + NodeShardState nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on primary allocation", unassignedShard.index(), unassignedShard.id(), unassignedShard, - decidedNode.nodeShardState.getNode() + nodeShardState.getNode() ); - node = decidedNode.nodeShardState.getNode(); - allocationId = decidedNode.nodeShardState.allocationId(); + node = nodeShardState.getNode(); + allocationId = nodeShardState.allocationId(); } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) { // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard // can be force-allocated to one of the nodes. nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true); if (nodesToAllocate.yesNodeShards.isEmpty() == false) { final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState; + final NodeShardState nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on forced primary allocation", unassignedShard.index(), @@ -260,11 +289,11 @@ public AllocateUnassignedDecision makeAllocationDecision( */ private static List buildNodeDecisions( NodesToAllocate nodesToAllocate, - FetchResult fetchedShardData, + NodeShardStates fetchedShardData, Set inSyncAllocationIds ) { List nodeResults = new ArrayList<>(); - Collection ineligibleShards; + Collection ineligibleShards = new ArrayList<>(); if (nodesToAllocate != null) { final Set discoNodes = new HashSet<>(); nodeResults.addAll( @@ -280,15 +309,15 @@ private static List buildNodeDecisions( }) .collect(Collectors.toList()) ); - ineligibleShards = fetchedShardData.getData() - .values() - .stream() - .filter(shardData -> discoNodes.contains(shardData.getNode()) == false) - .collect(Collectors.toList()); + fetchedShardData.iterator().forEachRemaining(shardData -> { + if (discoNodes.contains(shardData.getNode()) == false) { + ineligibleShards.add(shardData); + } + }); } else { // there were no shard copies that were eligible for being assigned the allocation, // so all fetched shard data are ineligible shards - ineligibleShards = fetchedShardData.getData().values(); + fetchedShardData.iterator().forEachRemaining(ineligibleShards::add); } nodeResults.addAll( @@ -300,21 +329,21 @@ private static List buildNodeDecisions( return nodeResults; } - private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { + protected static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Set inSyncAllocationIds) { final Exception storeErr = nodeShardState.storeException(); final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); } - private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( - (NodeGatewayStartedShards state) -> state.storeException() == null + private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( + (NodeShardState state) -> state.storeException() == null ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::primary + private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( + NodeShardState::primary ).reversed(); - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::replicationCheckpoint, + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( + NodeShardState::replicationCheckpoint, Comparator.nullsLast(Comparator.naturalOrder()) ); @@ -323,17 +352,19 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * entries with matching allocation id are always at the front of the list. */ - protected static NodeShardsResult buildNodeShardsResult( + protected NodeShardsResult buildNodeShardsResult( ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, Set inSyncAllocationIds, - FetchResult shardState, + NodeShardStates shardState, Logger logger ) { - List nodeShardStates = new ArrayList<>(); + NodeShardStates nodeShardStates = new NodeShardStates(getComparator(matchAnyShard, inSyncAllocationIds)); int numberOfAllocationsFound = 0; - for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { + Iterator iterator = shardState.iterator(); + while (iterator.hasNext()) { + NodeShardState nodeShardState = iterator.next(); DiscoveryNode node = nodeShardState.getNode(); String allocationId = nodeShardState.allocationId(); @@ -381,22 +412,33 @@ protected static NodeShardsResult buildNodeShardsResult( + nodeShardState.storeException(); numberOfAllocationsFound++; if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { - nodeShardStates.add(nodeShardState); + nodeShardStates.add(nodeShardState, nodeShardState.getNode()); } } } + if (logger.isTraceEnabled()) { + logger.trace( + "{} candidates for allocation: {}", + shard, + nodeShardStates.nodeShardStates.values().stream().map(DiscoveryNode::getName).collect(Collectors.joining(", ")) + ); + } + return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); + } + + protected static Comparator getComparator(boolean matchAnyShard, Set inSyncAllocationIds) { /** * Orders the active shards copies based on below comparators * 1. No store exception i.e. shard copy is readable * 2. Prefer previous primary shard * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. */ - final Comparator comparator; // allocation preference + final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeShardState state) -> inSyncAllocationIds.contains(state.allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) .thenComparing(PRIMARY_FIRST_COMPARATOR) @@ -406,31 +448,24 @@ protected static NodeShardsResult buildNodeShardsResult( .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } - nodeShardStates.sort(comparator); - - if (logger.isTraceEnabled()) { - logger.trace( - "{} candidates for allocation: {}", - shard, - nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) - ); - } - return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); + return comparator; } /** * Split the list of node shard states into groups yes/no/throttle based on allocation deciders */ - private static NodesToAllocate buildNodesToAllocate( + protected static NodesToAllocate buildNodesToAllocate( RoutingAllocation allocation, - List nodeShardStates, + NodeShardStates nodeShardStates, ShardRouting shardRouting, boolean forceAllocate ) { List yesNodeShards = new ArrayList<>(); List throttledNodeShards = new ArrayList<>(); List noNodeShards = new ArrayList<>(); - for (NodeGatewayStartedShards nodeShardState : nodeShardStates) { + Iterator iterator = nodeShardStates.iterator(); + while (iterator.hasNext()) { + NodeShardState nodeShardState = iterator.next(); RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; @@ -457,22 +492,22 @@ private static NodesToAllocate buildNodesToAllocate( protected abstract FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); - private static class NodeShardsResult { - final List orderedAllocationCandidates; + protected static class NodeShardsResult { + final NodeShardStates orderedAllocationCandidates; final int allocationsFound; - NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { + NodeShardsResult(NodeShardStates orderedAllocationCandidates, int allocationsFound) { this.orderedAllocationCandidates = orderedAllocationCandidates; this.allocationsFound = allocationsFound; } } - static class NodesToAllocate { - final List yesNodeShards; - final List throttleNodeShards; - final List noNodeShards; + protected static class NodesToAllocate { + final List yesNodeShards; + final List throttleNodeShards; + final List noNodeShards; - NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { + NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { this.yesNodeShards = yesNodeShards; this.throttleNodeShards = throttleNodeShards; this.noNodeShards = noNodeShards; @@ -483,11 +518,11 @@ static class NodesToAllocate { * This class encapsulates the shard state retrieved from a node and the decision that was made * by the allocator for allocating to the node that holds the shard copy. */ - private static class DecidedNode { - final NodeGatewayStartedShards nodeShardState; + protected static class DecidedNode { + final NodeShardState nodeShardState; final Decision decision; - private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) { + protected DecidedNode(NodeShardState nodeShardState, Decision decision) { this.nodeShardState = nodeShardState; this.decision = decision; } From f82990773c93f1aa98dc2c118a83487ce625a696 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 7 Sep 2023 04:15:14 +0530 Subject: [PATCH 02/18] Spotless changes Signed-off-by: Shivansh Arora --- .../gateway/BaseGatewayShardAllocator.java | 17 +++++------ .../gateway/PrimaryShardAllocator.java | 29 ++++++++++++++----- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 1b943db825b4f..161cebf4eda91 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -37,7 +37,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; @@ -49,13 +48,9 @@ import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; /** * An abstract class that implements basic functionality for allocating @@ -151,11 +146,13 @@ protected static class NodeShardState { private final ReplicationCheckpoint replicationCheckpoint; private final DiscoveryNode node; - public NodeShardState(DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - Exception storeException) { + public NodeShardState( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException + ) { this.node = node; this.allocationId = allocationId; this.primary = primary; diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 7a8472a4fd28b..3b4c0749b9049 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -141,13 +141,26 @@ public AllocateUnassignedDecision makeAllocationDecision( private static NodeShardStates getNodeShardStates(FetchResult shardsState) { NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { - nodeShardStates.add(new NodeShardState(node, nodeGatewayStartedShard.allocationId(), nodeGatewayStartedShard.primary(), nodeGatewayStartedShard.replicationCheckpoint(), nodeGatewayStartedShard.storeException()), node); + nodeShardStates.add( + new NodeShardState( + node, + nodeGatewayStartedShard.allocationId(), + nodeGatewayStartedShard.primary(), + nodeGatewayStartedShard.replicationCheckpoint(), + nodeGatewayStartedShard.storeException() + ), + node + ); }); return nodeShardStates; } - protected AllocateUnassignedDecision getAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, - NodeShardStates shardState, Logger logger) { + protected AllocateUnassignedDecision getAllocationDecision( + ShardRouting unassignedShard, + RoutingAllocation allocation, + NodeShardStates shardState, + Logger logger + ) { final boolean explain = allocation.debugDecision(); // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards @@ -338,9 +351,7 @@ protected static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Se private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( (NodeShardState state) -> state.storeException() == null ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( - NodeShardState::primary - ).reversed(); + private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing(NodeShardState::primary).reversed(); private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( NodeShardState::replicationCheckpoint, @@ -507,7 +518,11 @@ protected static class NodesToAllocate { final List throttleNodeShards; final List noNodeShards; - NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { + NodesToAllocate( + List yesNodeShards, + List throttleNodeShards, + List noNodeShards + ) { this.yesNodeShards = yesNodeShards; this.throttleNodeShards = throttleNodeShards; this.noNodeShards = noNodeShards; From a54547ff9df1a042930321e5fb78481437b30900 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 11 Sep 2023 00:29:27 +0530 Subject: [PATCH 03/18] moved NodeShardState and NodeShardStates to PSA Signed-off-by: Shivansh Arora --- .../gateway/BaseGatewayShardAllocator.java | 71 -------- .../gateway/PrimaryShardAllocator.java | 161 +++++++++++++++++- 2 files changed, 156 insertions(+), 76 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 161cebf4eda91..c4d5b89513471 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -34,7 +34,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; @@ -44,13 +43,9 @@ import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; import java.util.List; -import java.util.TreeMap; /** * An abstract class that implements basic functionality for allocating @@ -138,70 +133,4 @@ protected static List buildDecisionsForAllNodes(ShardRouti } return results; } - - protected static class NodeShardState { - private final String allocationId; - private final boolean primary; - private final Exception storeException; - private final ReplicationCheckpoint replicationCheckpoint; - private final DiscoveryNode node; - - public NodeShardState( - DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - Exception storeException - ) { - this.node = node; - this.allocationId = allocationId; - this.primary = primary; - this.replicationCheckpoint = replicationCheckpoint; - this.storeException = storeException; - } - - public String allocationId() { - return this.allocationId; - } - - public boolean primary() { - return this.primary; - } - - public ReplicationCheckpoint replicationCheckpoint() { - return this.replicationCheckpoint; - } - - public Exception storeException() { - return this.storeException; - } - - public DiscoveryNode getNode() { - return this.node; - } - } - - protected static class NodeShardStates { - TreeMap nodeShardStates; - - public NodeShardStates(Comparator comparator) { - this.nodeShardStates = new TreeMap<>(comparator); - } - - public void add(NodeShardState key, DiscoveryNode value) { - this.nodeShardStates.put(key, value); - } - - public DiscoveryNode get(NodeShardState key) { - return this.nodeShardStates.get(key); - } - - public int size() { - return this.nodeShardStates.size(); - } - - public Iterator iterator() { - return this.nodeShardStates.keySet().iterator(); - } - } } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 3b4c0749b9049..37fdee2e4e8c5 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -51,6 +51,7 @@ import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.ArrayList; import java.util.Collection; @@ -60,6 +61,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -134,11 +136,11 @@ public AllocateUnassignedDecision makeAllocationDecision( } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } - NodeShardStates nodeShardStates = getNodeShardStates(shardState); + NodeShardStates nodeShardStates = adaptToNodeShardStates(shardState); return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); } - private static NodeShardStates getNodeShardStates(FetchResult shardsState) { + private static NodeShardStates adaptToNodeShardStates(FetchResult shardsState) { NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add( @@ -371,7 +373,7 @@ protected NodeShardsResult buildNodeShardsResult( NodeShardStates shardState, Logger logger ) { - NodeShardStates nodeShardStates = new NodeShardStates(getComparator(matchAnyShard, inSyncAllocationIds)); + NodeShardStates nodeShardStates = new NodeShardStates(createActiveShardComparator(matchAnyShard, inSyncAllocationIds)); int numberOfAllocationsFound = 0; Iterator iterator = shardState.iterator(); while (iterator.hasNext()) { @@ -432,13 +434,13 @@ protected NodeShardsResult buildNodeShardsResult( logger.trace( "{} candidates for allocation: {}", shard, - nodeShardStates.nodeShardStates.values().stream().map(DiscoveryNode::getName).collect(Collectors.joining(", ")) + nodeShardStates.stream().map(nodeShardStates::get).map(DiscoveryNode::getName).collect(Collectors.joining(", ")) ); } return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); } - protected static Comparator getComparator(boolean matchAnyShard, Set inSyncAllocationIds) { + protected static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { /** * Orders the active shards copies based on below comparators * 1. No store exception i.e. shard copy is readable @@ -542,4 +544,153 @@ protected DecidedNode(NodeShardState nodeShardState, Decision decision) { this.decision = decision; } } + + /** + * The NodeShardState class represents the state of a node shard in a distributed system. + * It includes several key data points about the shard state, such as its allocation ID, + * whether it's a primary shard, any store exception, the replication checkpoint, and the + * DiscoveryNode it belongs to. + *

+ * This class is designed to be used in conjunction with the {@link NodeShardStates} class, which + * manages multiple NodeShardState instances. + */ + protected static class NodeShardState { + // Allocation ID of the shard + private final String allocationId; + // Whether the shard is primary + private final boolean primary; + // Any store exception associated with the shard + private final Exception storeException; + // The replication checkpoint of the shard + private final ReplicationCheckpoint replicationCheckpoint; + // The DiscoveryNode the shard belongs to + private final DiscoveryNode node; + + /** + * Constructs a new NodeShardState with the given parameters. + * @param node The DiscoveryNode the shard belongs to. + * @param allocationId The allocation ID of the shard. + * @param primary Whether the shard is a primary shard. + * @param replicationCheckpoint The replication checkpoint of the shard. + * @param storeException Any store exception associated with the shard. + */ + public NodeShardState( + DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException + ) { + this.node = node; + this.allocationId = allocationId; + this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; + this.storeException = storeException; + } + + /** + * Returns the allocation ID of the shard. + * @return The allocation ID of the shard. + */ + public String allocationId() { + return this.allocationId; + } + + /** + * Returns whether the shard is a primary shard. + * @return True if the shard is a primary shard, false otherwise. + */ + public boolean primary() { + return this.primary; + } + + /** + * Returns the replication checkpoint of the shard. + * @return The replication checkpoint of the shard. + */ + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + + /** + * Returns any store exception associated with the shard. + * @return The store exception associated with the shard, or null if there isn't one. + */ + public Exception storeException() { + return this.storeException; + } + + /** + * Returns the DiscoveryNode the shard belongs to. + * @return The DiscoveryNode the shard belongs to. + */ + public DiscoveryNode getNode() { + return this.node; + } + } + + /** + * The NodeShardStates class manages pairs of {@link NodeShardState} and {@link DiscoveryNode}. + * It uses a TreeMap to ensure that the entries are sorted based on the natural + * ordering of the {@link NodeShardState} keys, or according to a provided Comparator. + *

+ * The TreeMap is implemented using a Red-Black tree, which provides efficient + * performance for common operations such as adding, removing, and retrieving + * elements. + * @see TreeMap + */ + protected static class NodeShardStates { + // TreeMap to store NodeShardState and DiscoveryNode pairs + private final TreeMap nodeShardStates; + + /** + * Constructs a new NodeShardStates with a given Comparator. + * @param comparator Comparator to determine the order of the TreeMap. + */ + public NodeShardStates(Comparator comparator) { + this.nodeShardStates = new TreeMap<>(comparator); + } + + /** + * Adds a new {@link NodeShardState} and {@link DiscoveryNode} pair to the TreeMap. + * @param key {@link NodeShardState} key. + * @param value {@link DiscoveryNode} value. + */ + public void add(NodeShardState key, DiscoveryNode value) { + this.nodeShardStates.put(key, value); + } + + /** + * Retrieves the {@link DiscoveryNode} value associated with a given {@link NodeShardState} key. + * @param key {@link NodeShardState} key. + * @return {@link DiscoveryNode} value associated with the key. + */ + public DiscoveryNode get(NodeShardState key) { + return this.nodeShardStates.get(key); + } + + /** + * Returns the number of key-value pairs in the TreeMap. + * @return Number of key-value pairs. + */ + public int size() { + return this.nodeShardStates.size(); + } + + /** + * Returns an iterator over the {@link NodeShardState} keys in the TreeMap. + * @return Iterator over the keys. + */ + public Iterator iterator() { + return this.nodeShardStates.keySet().iterator(); + } + + /** + ** Returns a stream of the {@link NodeShardState} keys in the TreeMap. + * @return Stream of the keys. + */ + public Stream stream() { + return this.nodeShardStates.keySet().stream(); + } + } } From d3387afbb987cce3ac266630d11ef913718c8a71 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 11 Sep 2023 00:43:05 +0530 Subject: [PATCH 04/18] Made createActiveShardComparator private Signed-off-by: Shivansh Arora --- .../main/java/org/opensearch/gateway/PrimaryShardAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 37fdee2e4e8c5..ef9999df03a4f 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -440,7 +440,7 @@ protected NodeShardsResult buildNodeShardsResult( return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); } - protected static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { + private static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { /** * Orders the active shards copies based on below comparators * 1. No store exception i.e. shard copy is readable From 5aaf97e0344905580d6e24edd6d8bff0c4a803af Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 15 Sep 2023 11:23:05 +0530 Subject: [PATCH 05/18] Using List of NodeShardState in NodeShardStates Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 - .../gateway/PrimaryShardAllocator.java | 47 +++++++++---------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffd9d441edf59..79df58a19c0ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -184,7 +184,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248)) - Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503)) - [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908)) -- PrimaryShardAllocator refactor to abstract out shard state and method calls ([#9760](https://github.com/opensearch-project/OpenSearch/pull/9760))) ### Deprecated diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index ef9999df03a4f..60785bccc2ec5 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -141,7 +141,7 @@ public AllocateUnassignedDecision makeAllocationDecision( } private static NodeShardStates adaptToNodeShardStates(FetchResult shardsState) { - NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1); + NodeShardStates nodeShardStates = new NodeShardStates(); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add( new NodeShardState( @@ -373,7 +373,7 @@ protected NodeShardsResult buildNodeShardsResult( NodeShardStates shardState, Logger logger ) { - NodeShardStates nodeShardStates = new NodeShardStates(createActiveShardComparator(matchAnyShard, inSyncAllocationIds)); + NodeShardStates nodeShardStates = new NodeShardStates(); int numberOfAllocationsFound = 0; Iterator iterator = shardState.iterator(); while (iterator.hasNext()) { @@ -430,11 +430,13 @@ protected NodeShardsResult buildNodeShardsResult( } } + nodeShardStates.sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds)); + if (logger.isTraceEnabled()) { logger.trace( "{} candidates for allocation: {}", shard, - nodeShardStates.stream().map(nodeShardStates::get).map(DiscoveryNode::getName).collect(Collectors.joining(", ")) + nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) ); } return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); @@ -641,32 +643,21 @@ public DiscoveryNode getNode() { */ protected static class NodeShardStates { // TreeMap to store NodeShardState and DiscoveryNode pairs - private final TreeMap nodeShardStates; - - /** - * Constructs a new NodeShardStates with a given Comparator. - * @param comparator Comparator to determine the order of the TreeMap. - */ - public NodeShardStates(Comparator comparator) { - this.nodeShardStates = new TreeMap<>(comparator); - } + private final List nodeShardStates; /** - * Adds a new {@link NodeShardState} and {@link DiscoveryNode} pair to the TreeMap. - * @param key {@link NodeShardState} key. - * @param value {@link DiscoveryNode} value. + * Constructs a new NodeShardStates */ - public void add(NodeShardState key, DiscoveryNode value) { - this.nodeShardStates.put(key, value); + public NodeShardStates() { + this.nodeShardStates = new ArrayList<>(); } /** - * Retrieves the {@link DiscoveryNode} value associated with a given {@link NodeShardState} key. - * @param key {@link NodeShardState} key. - * @return {@link DiscoveryNode} value associated with the key. + * Adds a new {@link NodeShardState} to the list. + * @param state {@link NodeShardState} node shard state. */ - public DiscoveryNode get(NodeShardState key) { - return this.nodeShardStates.get(key); + public void add(NodeShardState state) { + this.nodeShardStates.add(state); } /** @@ -682,7 +673,7 @@ public int size() { * @return Iterator over the keys. */ public Iterator iterator() { - return this.nodeShardStates.keySet().iterator(); + return this.nodeShardStates.iterator(); } /** @@ -690,7 +681,15 @@ public Iterator iterator() { * @return Stream of the keys. */ public Stream stream() { - return this.nodeShardStates.keySet().stream(); + return this.nodeShardStates.stream(); + } + + /** + * Sorts the NodeShardStates based on the provided Comparator. + * @param comparator The Comparator to use. + */ + public void sort(Comparator comparator) { + this.nodeShardStates.sort(comparator); } } } From e7c778c1eda57190147893bf67621a126b221328 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 15 Sep 2023 15:12:02 +0530 Subject: [PATCH 06/18] Build failure fix Signed-off-by: Shivansh Arora --- .../gateway/BaseGatewayShardAllocator.java | 13 ++++++------- .../opensearch/gateway/PrimaryShardAllocator.java | 5 ++--- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index c4d5b89513471..59ef894958cbe 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -50,7 +50,7 @@ /** * An abstract class that implements basic functionality for allocating * shards to nodes based on shard copies that already exist in the cluster. - *

+ * * Individual implementations of this class are responsible for providing * the logic to determine to which nodes (if any) those shards are allocated. * @@ -64,9 +64,8 @@ public abstract class BaseGatewayShardAllocator { * Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist. * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)} * to make decisions on assigning shards to nodes. - * - * @param shardRouting the shard to allocate - * @param allocation the allocation state container object + * @param shardRouting the shard to allocate + * @param allocation the allocation state container object * @param unassignedAllocationHandler handles the allocation of the current shard */ public void allocateUnassigned( @@ -110,9 +109,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions * about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated. * - * @param unassignedShard the unassigned shard to allocate - * @param allocation the current routing state - * @param logger the logger + * @param unassignedShard the unassigned shard to allocate + * @param allocation the current routing state + * @param logger the logger * @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision */ public abstract AllocateUnassignedDecision makeAllocationDecision( diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 60785bccc2ec5..5cd9529a861e5 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -150,8 +150,7 @@ private static NodeShardStates adaptToNodeShardStates(FetchResult Date: Fri, 15 Sep 2023 15:53:53 +0530 Subject: [PATCH 07/18] Added javadoc for NodeShardResult and NodesToAllocate Signed-off-by: Shivansh Arora --- .../java/org/opensearch/gateway/PrimaryShardAllocator.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 5cd9529a861e5..b60fd8d63bf3b 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -506,6 +506,9 @@ protected static NodesToAllocate buildNodesToAllocate( protected abstract FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); + /** + * This class encapsulates the result of a call to {@link #buildNodeShardsResult} + */ protected static class NodeShardsResult { final NodeShardStates orderedAllocationCandidates; final int allocationsFound; @@ -516,6 +519,9 @@ protected static class NodeShardsResult { } } + /** + * This class encapsulates the result of a call to {@link #buildNodesToAllocate(RoutingAllocation, NodeShardStates, ShardRouting, boolean)} + * */ protected static class NodesToAllocate { final List yesNodeShards; final List throttleNodeShards; From 90a03a029bd995d56889e0917d133134ede7269a Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 20 Sep 2023 11:46:46 +0530 Subject: [PATCH 08/18] Modified NodeShardStates to only have getter method Signed-off-by: Shivansh Arora --- .../gateway/PrimaryShardAllocator.java | 109 ++++++------------ 1 file changed, 36 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index b60fd8d63bf3b..06c208f48717a 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -143,7 +143,7 @@ public AllocateUnassignedDecision makeAllocationDecision( private static NodeShardStates adaptToNodeShardStates(FetchResult shardsState) { NodeShardStates nodeShardStates = new NodeShardStates(); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { - nodeShardStates.add( + nodeShardStates.getNodeShardStates().add( new NodeShardState( node, nodeGatewayStartedShard.allocationId(), @@ -179,12 +179,12 @@ protected AllocateUnassignedDecision getAllocationDecision( shardState, logger ); - final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; + final boolean enoughAllocationsFound = !nodeShardsResult.orderedAllocationCandidates.getNodeShardStates().isEmpty(); logger.debug( "[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(), unassignedShard.id(), - nodeShardsResult.orderedAllocationCandidates.size(), + nodeShardsResult.orderedAllocationCandidates.getNodeShardStates().size(), unassignedShard, inSyncAllocationIds ); @@ -323,15 +323,15 @@ private static List buildNodeDecisions( }) .collect(Collectors.toList()) ); - fetchedShardData.iterator().forEachRemaining(shardData -> { - if (discoNodes.contains(shardData.getNode()) == false) { - ineligibleShards.add(shardData); - } - }); + ineligibleShards = fetchedShardData + .getNodeShardStates() + .stream() + .filter(shardData -> discoNodes.contains(shardData.getNode()) == false) + .collect(Collectors.toList()); } else { // there were no shard copies that were eligible for being assigned the allocation, // so all fetched shard data are ineligible shards - fetchedShardData.iterator().forEachRemaining(ineligibleShards::add); + ineligibleShards = fetchedShardData.getNodeShardStates(); } nodeResults.addAll( @@ -374,9 +374,7 @@ protected NodeShardsResult buildNodeShardsResult( ) { NodeShardStates nodeShardStates = new NodeShardStates(); int numberOfAllocationsFound = 0; - Iterator iterator = shardState.iterator(); - while (iterator.hasNext()) { - NodeShardState nodeShardState = iterator.next(); + for (NodeShardState nodeShardState : shardState.getNodeShardStates()) { DiscoveryNode node = nodeShardState.getNode(); String allocationId = nodeShardState.allocationId(); @@ -394,24 +392,24 @@ protected NodeShardsResult buildNodeShardsResult( final String finalAllocationId = allocationId; if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { logger.trace( - () -> new ParameterizedMessage( - "[{}] on node [{}] has allocation id [{}] but the store can not be " - + "opened as it's locked, treating as valid shard", - shard, - nodeShardState.getNode(), - finalAllocationId - ), - nodeShardState.storeException() + () -> new ParameterizedMessage( + "[{}] on node [{}] has allocation id [{}] but the store can not be " + + "opened as it's locked, treating as valid shard", + shard, + nodeShardState.getNode(), + finalAllocationId + ), + nodeShardState.storeException() ); } else { logger.trace( - () -> new ParameterizedMessage( - "[{}] on node [{}] has allocation id [{}] but the store can not be " + "opened, treating as no allocation id", - shard, - nodeShardState.getNode(), - finalAllocationId - ), - nodeShardState.storeException() + () -> new ParameterizedMessage( + "[{}] on node [{}] has allocation id [{}] but the store can not be " + "opened, treating as no allocation id", + shard, + nodeShardState.getNode(), + finalAllocationId + ), + nodeShardState.storeException() ); allocationId = null; } @@ -419,23 +417,23 @@ protected NodeShardsResult buildNodeShardsResult( if (allocationId != null) { assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException - : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " + : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " + "store throwing " + nodeShardState.storeException(); numberOfAllocationsFound++; if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { - nodeShardStates.add(nodeShardState); + nodeShardStates.getNodeShardStates().add(nodeShardState); } } } - nodeShardStates.sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds)); + nodeShardStates.getNodeShardStates().sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds)); if (logger.isTraceEnabled()) { logger.trace( "{} candidates for allocation: {}", shard, - nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) + nodeShardStates.getNodeShardStates().stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) ); } return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); @@ -477,17 +475,15 @@ protected static NodesToAllocate buildNodesToAllocate( List yesNodeShards = new ArrayList<>(); List throttledNodeShards = new ArrayList<>(); List noNodeShards = new ArrayList<>(); - Iterator iterator = nodeShardStates.iterator(); - while (iterator.hasNext()) { - NodeShardState nodeShardState = iterator.next(); + for (NodeShardState nodeShardState : nodeShardStates.getNodeShardStates()) { RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; } Decision decision = forceAllocate - ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) - : allocation.deciders().canAllocate(shardRouting, node, allocation); + ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) + : allocation.deciders().canAllocate(shardRouting, node, allocation); DecidedNode decidedNode = new DecidedNode(nodeShardState, decision); if (decision.type() == Type.THROTTLE) { throttledNodeShards.add(decidedNode); @@ -647,7 +643,7 @@ public DiscoveryNode getNode() { * @see TreeMap */ protected static class NodeShardStates { - // TreeMap to store NodeShardState and DiscoveryNode pairs + // List of entries to store NodeShardState private final List nodeShardStates; /** @@ -658,43 +654,10 @@ public NodeShardStates() { } /** - * Adds a new {@link NodeShardState} to the list. - * @param state {@link NodeShardState} node shard state. - */ - public void add(NodeShardState state) { - this.nodeShardStates.add(state); - } - - /** - * Returns the number of key-value pairs in the TreeMap. - * @return Number of key-value pairs. - */ - public int size() { - return this.nodeShardStates.size(); - } - - /** - * Returns an iterator over the {@link NodeShardState} keys in the TreeMap. - * @return Iterator over the keys. - */ - public Iterator iterator() { - return this.nodeShardStates.iterator(); - } - - /** - ** Returns a stream of the {@link NodeShardState} keys in the TreeMap. - * @return Stream of the keys. - */ - public Stream stream() { - return this.nodeShardStates.stream(); - } - - /** - * Sorts the NodeShardStates based on the provided Comparator. - * @param comparator The Comparator to use. + * Returns the list of {@link NodeShardState}. */ - public void sort(Comparator comparator) { - this.nodeShardStates.sort(comparator); + public List getNodeShardStates() { + return this.nodeShardStates; } } } From 7418d66d1363eadafb8a4f3fcf1aebaa404bd040 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 20 Sep 2023 13:31:43 +0530 Subject: [PATCH 09/18] Spotless changes Signed-off-by: Shivansh Arora --- .../gateway/PrimaryShardAllocator.java | 59 +++++++++---------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 06c208f48717a..0fcfd0a96b8ac 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -58,7 +58,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeMap; @@ -143,15 +142,16 @@ public AllocateUnassignedDecision makeAllocationDecision( private static NodeShardStates adaptToNodeShardStates(FetchResult shardsState) { NodeShardStates nodeShardStates = new NodeShardStates(); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { - nodeShardStates.getNodeShardStates().add( - new NodeShardState( - node, - nodeGatewayStartedShard.allocationId(), - nodeGatewayStartedShard.primary(), - nodeGatewayStartedShard.replicationCheckpoint(), - nodeGatewayStartedShard.storeException() - ) - ); + nodeShardStates.getNodeShardStates() + .add( + new NodeShardState( + node, + nodeGatewayStartedShard.allocationId(), + nodeGatewayStartedShard.primary(), + nodeGatewayStartedShard.replicationCheckpoint(), + nodeGatewayStartedShard.storeException() + ) + ); }); return nodeShardStates; } @@ -323,8 +323,7 @@ private static List buildNodeDecisions( }) .collect(Collectors.toList()) ); - ineligibleShards = fetchedShardData - .getNodeShardStates() + ineligibleShards = fetchedShardData.getNodeShardStates() .stream() .filter(shardData -> discoNodes.contains(shardData.getNode()) == false) .collect(Collectors.toList()); @@ -392,24 +391,24 @@ protected NodeShardsResult buildNodeShardsResult( final String finalAllocationId = allocationId; if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) { logger.trace( - () -> new ParameterizedMessage( - "[{}] on node [{}] has allocation id [{}] but the store can not be " - + "opened as it's locked, treating as valid shard", - shard, - nodeShardState.getNode(), - finalAllocationId - ), - nodeShardState.storeException() + () -> new ParameterizedMessage( + "[{}] on node [{}] has allocation id [{}] but the store can not be " + + "opened as it's locked, treating as valid shard", + shard, + nodeShardState.getNode(), + finalAllocationId + ), + nodeShardState.storeException() ); } else { logger.trace( - () -> new ParameterizedMessage( - "[{}] on node [{}] has allocation id [{}] but the store can not be " + "opened, treating as no allocation id", - shard, - nodeShardState.getNode(), - finalAllocationId - ), - nodeShardState.storeException() + () -> new ParameterizedMessage( + "[{}] on node [{}] has allocation id [{}] but the store can not be " + "opened, treating as no allocation id", + shard, + nodeShardState.getNode(), + finalAllocationId + ), + nodeShardState.storeException() ); allocationId = null; } @@ -417,7 +416,7 @@ protected NodeShardsResult buildNodeShardsResult( if (allocationId != null) { assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException - : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " + : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " + "store throwing " + nodeShardState.storeException(); numberOfAllocationsFound++; @@ -482,8 +481,8 @@ protected static NodesToAllocate buildNodesToAllocate( } Decision decision = forceAllocate - ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) - : allocation.deciders().canAllocate(shardRouting, node, allocation); + ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) + : allocation.deciders().canAllocate(shardRouting, node, allocation); DecidedNode decidedNode = new DecidedNode(nodeShardState, decision); if (decision.type() == Type.THROTTLE) { throttledNodeShards.add(decidedNode); From 7aa813527b61771d74796e9b85b203274bba61a5 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 29 Sep 2023 10:54:41 +0530 Subject: [PATCH 10/18] Removed changes which were not necessary Signed-off-by: Shivansh Arora --- .../gateway/PrimaryShardAllocator.java | 108 ++++++------------ 1 file changed, 35 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 0fcfd0a96b8ac..35d4a62cd929d 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -135,15 +135,14 @@ public AllocateUnassignedDecision makeAllocationDecision( } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } - NodeShardStates nodeShardStates = adaptToNodeShardStates(shardState); + List nodeShardStates = adaptToNodeShardStates(shardState); return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); } - private static NodeShardStates adaptToNodeShardStates(FetchResult shardsState) { - NodeShardStates nodeShardStates = new NodeShardStates(); + private static List adaptToNodeShardStates(FetchResult shardsState) { + List nodeShardStates = new ArrayList(); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { - nodeShardStates.getNodeShardStates() - .add( + nodeShardStates.add( new NodeShardState( node, nodeGatewayStartedShard.allocationId(), @@ -159,7 +158,7 @@ private static NodeShardStates adaptToNodeShardStates(FetchResult shardState, Logger logger ) { final boolean explain = allocation.debugDecision(); @@ -179,12 +178,12 @@ protected AllocateUnassignedDecision getAllocationDecision( shardState, logger ); - final boolean enoughAllocationsFound = !nodeShardsResult.orderedAllocationCandidates.getNodeShardStates().isEmpty(); + final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0; logger.debug( "[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(), unassignedShard.id(), - nodeShardsResult.orderedAllocationCandidates.getNodeShardStates().size(), + nodeShardsResult.orderedAllocationCandidates.size(), unassignedShard, inSyncAllocationIds ); @@ -227,16 +226,15 @@ protected AllocateUnassignedDecision getAllocationDecision( boolean throttled = false; if (nodesToAllocate.yesNodeShards.isEmpty() == false) { DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - NodeShardState nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on primary allocation", unassignedShard.index(), unassignedShard.id(), unassignedShard, - nodeShardState.getNode() + decidedNode.nodeShardState.getNode() ); - node = nodeShardState.getNode(); - allocationId = nodeShardState.allocationId(); + node = decidedNode.nodeShardState.getNode(); + allocationId = decidedNode.nodeShardState.allocationId(); } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) { // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard // can be force-allocated to one of the nodes. @@ -303,7 +301,7 @@ protected AllocateUnassignedDecision getAllocationDecision( */ private static List buildNodeDecisions( NodesToAllocate nodesToAllocate, - NodeShardStates fetchedShardData, + List fetchedShardData, Set inSyncAllocationIds ) { List nodeResults = new ArrayList<>(); @@ -323,14 +321,13 @@ private static List buildNodeDecisions( }) .collect(Collectors.toList()) ); - ineligibleShards = fetchedShardData.getNodeShardStates() - .stream() + ineligibleShards = fetchedShardData.stream() .filter(shardData -> discoNodes.contains(shardData.getNode()) == false) .collect(Collectors.toList()); } else { // there were no shard copies that were eligible for being assigned the allocation, // so all fetched shard data are ineligible shards - ineligibleShards = fetchedShardData.getNodeShardStates(); + ineligibleShards = fetchedShardData; } nodeResults.addAll( @@ -342,7 +339,7 @@ private static List buildNodeDecisions( return nodeResults; } - protected static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Set inSyncAllocationIds) { + private static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Set inSyncAllocationIds) { final Exception storeErr = nodeShardState.storeException(); final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); @@ -363,17 +360,17 @@ protected static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Se * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * entries with matching allocation id are always at the front of the list. */ - protected NodeShardsResult buildNodeShardsResult( + protected static NodeShardsResult buildNodeShardsResult( ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, Set inSyncAllocationIds, - NodeShardStates shardState, + List shardState, Logger logger ) { - NodeShardStates nodeShardStates = new NodeShardStates(); + List nodeShardStates = new ArrayList<>(); int numberOfAllocationsFound = 0; - for (NodeShardState nodeShardState : shardState.getNodeShardStates()) { + for (NodeShardState nodeShardState : shardState) { DiscoveryNode node = nodeShardState.getNode(); String allocationId = nodeShardState.allocationId(); @@ -421,18 +418,18 @@ protected NodeShardsResult buildNodeShardsResult( + nodeShardState.storeException(); numberOfAllocationsFound++; if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { - nodeShardStates.getNodeShardStates().add(nodeShardState); + nodeShardStates.add(nodeShardState); } } } - nodeShardStates.getNodeShardStates().sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds)); + nodeShardStates.sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds)); if (logger.isTraceEnabled()) { logger.trace( "{} candidates for allocation: {}", shard, - nodeShardStates.getNodeShardStates().stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) + nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) ); } return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); @@ -465,16 +462,16 @@ private static Comparator createActiveShardComparator(boolean ma /** * Split the list of node shard states into groups yes/no/throttle based on allocation deciders */ - protected static NodesToAllocate buildNodesToAllocate( + private static NodesToAllocate buildNodesToAllocate( RoutingAllocation allocation, - NodeShardStates nodeShardStates, + List nodeShardStates, ShardRouting shardRouting, boolean forceAllocate ) { List yesNodeShards = new ArrayList<>(); List throttledNodeShards = new ArrayList<>(); List noNodeShards = new ArrayList<>(); - for (NodeShardState nodeShardState : nodeShardStates.getNodeShardStates()) { + for (NodeShardState nodeShardState : nodeShardStates) { RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; @@ -504,28 +501,25 @@ protected static NodesToAllocate buildNodesToAllocate( /** * This class encapsulates the result of a call to {@link #buildNodeShardsResult} */ - protected static class NodeShardsResult { - final NodeShardStates orderedAllocationCandidates; + static class NodeShardsResult { + final List orderedAllocationCandidates; final int allocationsFound; - NodeShardsResult(NodeShardStates orderedAllocationCandidates, int allocationsFound) { + NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { this.orderedAllocationCandidates = orderedAllocationCandidates; this.allocationsFound = allocationsFound; } } - /** - * This class encapsulates the result of a call to {@link #buildNodesToAllocate(RoutingAllocation, NodeShardStates, ShardRouting, boolean)} - * */ protected static class NodesToAllocate { - final List yesNodeShards; - final List throttleNodeShards; - final List noNodeShards; + final List yesNodeShards; + final List throttleNodeShards; + final List noNodeShards; NodesToAllocate( - List yesNodeShards, - List throttleNodeShards, - List noNodeShards + List yesNodeShards, + List throttleNodeShards, + List noNodeShards ) { this.yesNodeShards = yesNodeShards; this.throttleNodeShards = throttleNodeShards; @@ -537,11 +531,11 @@ protected static class NodesToAllocate { * This class encapsulates the shard state retrieved from a node and the decision that was made * by the allocator for allocating to the node that holds the shard copy. */ - protected static class DecidedNode { + private static class DecidedNode { final NodeShardState nodeShardState; final Decision decision; - protected DecidedNode(NodeShardState nodeShardState, Decision decision) { + private DecidedNode(NodeShardState nodeShardState, Decision decision) { this.nodeShardState = nodeShardState; this.decision = decision; } @@ -552,9 +546,6 @@ protected DecidedNode(NodeShardState nodeShardState, Decision decision) { * It includes several key data points about the shard state, such as its allocation ID, * whether it's a primary shard, any store exception, the replication checkpoint, and the * DiscoveryNode it belongs to. - *

- * This class is designed to be used in conjunction with the {@link NodeShardStates} class, which - * manages multiple NodeShardState instances. */ protected static class NodeShardState { // Allocation ID of the shard @@ -630,33 +621,4 @@ public DiscoveryNode getNode() { return this.node; } } - - /** - * The NodeShardStates class manages pairs of {@link NodeShardState} and {@link DiscoveryNode}. - * It uses a TreeMap to ensure that the entries are sorted based on the natural - * ordering of the {@link NodeShardState} keys, or according to a provided Comparator. - *

- * The TreeMap is implemented using a Red-Black tree, which provides efficient - * performance for common operations such as adding, removing, and retrieving - * elements. - * @see TreeMap - */ - protected static class NodeShardStates { - // List of entries to store NodeShardState - private final List nodeShardStates; - - /** - * Constructs a new NodeShardStates - */ - public NodeShardStates() { - this.nodeShardStates = new ArrayList<>(); - } - - /** - * Returns the list of {@link NodeShardState}. - */ - public List getNodeShardStates() { - return this.nodeShardStates; - } - } } From ba1e746abe9c5092791af069a98998905e241083 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 29 Sep 2023 10:56:55 +0530 Subject: [PATCH 11/18] Spotless changes Signed-off-by: Shivansh Arora --- .../gateway/PrimaryShardAllocator.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 35d4a62cd929d..a1d53b3e92ff5 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -60,7 +60,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -143,14 +142,14 @@ private static List adaptToNodeShardStates(FetchResult nodeShardStates = new ArrayList(); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add( - new NodeShardState( - node, - nodeGatewayStartedShard.allocationId(), - nodeGatewayStartedShard.primary(), - nodeGatewayStartedShard.replicationCheckpoint(), - nodeGatewayStartedShard.storeException() - ) - ); + new NodeShardState( + node, + nodeGatewayStartedShard.allocationId(), + nodeGatewayStartedShard.primary(), + nodeGatewayStartedShard.replicationCheckpoint(), + nodeGatewayStartedShard.storeException() + ) + ); }); return nodeShardStates; } @@ -516,11 +515,7 @@ protected static class NodesToAllocate { final List throttleNodeShards; final List noNodeShards; - NodesToAllocate( - List yesNodeShards, - List throttleNodeShards, - List noNodeShards - ) { + NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { this.yesNodeShards = yesNodeShards; this.throttleNodeShards = throttleNodeShards; this.noNodeShards = noNodeShards; From 36352403b84a438a88b2cbc303722077660cbc3e Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Sun, 1 Oct 2023 02:21:24 +0530 Subject: [PATCH 12/18] Added javadocs PSA Signed-off-by: Shivansh Arora --- .../java/org/opensearch/gateway/PrimaryShardAllocator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index a1d53b3e92ff5..7952b8c7aee0d 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -510,6 +510,9 @@ static class NodeShardsResult { } } + /** + * This class encapsulates the result of a call to {@link #buildNodesToAllocate} + */ protected static class NodesToAllocate { final List yesNodeShards; final List throttleNodeShards; From 4bf3949dd004dc79842b47d21d3fcf5f7323401f Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Tue, 12 Dec 2023 00:35:11 +0530 Subject: [PATCH 13/18] Refactor PSA to use NodeGatewayStartedShards, removed NodeShardState Signed-off-by: Shivansh Arora --- .idea/vcs.xml | 30 +-- .../gateway/PrimaryShardAllocator.java | 184 +++++------------- .../test/gateway/TestGatewayAllocator.java | 117 ++++++++++- 3 files changed, 180 insertions(+), 151 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 48557884a8893..b74d87df8f17b 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,20 @@ - - - + + + - + diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index e3a4191758a5e..719de2db6db95 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -126,30 +126,23 @@ public AllocateUnassignedDecision makeAllocationDecision( return decision; } final FetchResult shardState = fetchData(unassignedShard, allocation); - if (shardState.hasData() == false) { - allocation.setHasPendingAsyncFetch(); - List nodeDecisions = null; - if (allocation.debugDecision()) { - nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); - } - return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); - } - List nodeShardStates = adaptToNodeShardStates(shardState); + List nodeShardStates = adaptToNodeShardStates(shardState); return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); } - private static List adaptToNodeShardStates(FetchResult shardsState) { - List nodeShardStates = new ArrayList(); + /** + * Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards} + * Returns null if {@link FetchResult} does not have any data. + * + * @param shardsState {@link FetchResult} + * */ + private static List adaptToNodeShardStates(FetchResult shardsState) { + if (!shardsState.hasData()){ + return null; + } + List nodeShardStates = new ArrayList<>(); shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { - nodeShardStates.add( - new NodeShardState( - node, - nodeGatewayStartedShard.allocationId(), - nodeGatewayStartedShard.primary(), - nodeGatewayStartedShard.replicationCheckpoint(), - nodeGatewayStartedShard.storeException() - ) - ); + nodeShardStates.add(nodeGatewayStartedShard); }); return nodeShardStates; } @@ -157,9 +150,17 @@ private static List adaptToNodeShardStates(FetchResult shardState, + List shardState, Logger logger ) { + if (shardState == null) { + allocation.setHasPendingAsyncFetch(); + List nodeDecisions = null; + if (allocation.debugDecision()) { + nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); + } + return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); + } final boolean explain = allocation.debugDecision(); // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards @@ -240,7 +241,7 @@ protected AllocateUnassignedDecision getAllocationDecision( nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true); if (nodesToAllocate.yesNodeShards.isEmpty() == false) { final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - final NodeShardState nodeShardState = decidedNode.nodeShardState; + final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on forced primary allocation", unassignedShard.index(), @@ -300,11 +301,11 @@ protected AllocateUnassignedDecision getAllocationDecision( */ private static List buildNodeDecisions( NodesToAllocate nodesToAllocate, - List fetchedShardData, + List fetchedShardData, Set inSyncAllocationIds ) { List nodeResults = new ArrayList<>(); - Collection ineligibleShards = new ArrayList<>(); + Collection ineligibleShards = new ArrayList<>(); if (nodesToAllocate != null) { final Set discoNodes = new HashSet<>(); nodeResults.addAll( @@ -338,21 +339,23 @@ private static List buildNodeDecisions( return nodeResults; } - private static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Set inSyncAllocationIds) { + private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { final Exception storeErr = nodeShardState.storeException(); final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); } - private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( - (NodeShardState state) -> state.storeException() == null + private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( + (NodeGatewayStartedShards state) -> state.storeException() == null ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing(NodeShardState::primary).reversed(); + private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( + NodeGatewayStartedShards::primary).reversed(); - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( - NodeShardState::replicationCheckpoint, - Comparator.nullsLast(Comparator.naturalOrder()) - ); + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = + Comparator.comparing( + NodeGatewayStartedShards::replicationCheckpoint, + Comparator.nullsLast(Comparator.naturalOrder()) + ); /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching @@ -364,12 +367,12 @@ protected static NodeShardsResult buildNodeShardsResult( boolean matchAnyShard, Set ignoreNodes, Set inSyncAllocationIds, - List shardState, + List shardState, Logger logger ) { - List nodeShardStates = new ArrayList<>(); + List nodeShardStates = new ArrayList<>(); int numberOfAllocationsFound = 0; - for (NodeShardState nodeShardState : shardState) { + for (NodeGatewayStartedShards nodeShardState : shardState) { DiscoveryNode node = nodeShardState.getNode(); String allocationId = nodeShardState.allocationId(); @@ -434,18 +437,18 @@ protected static NodeShardsResult buildNodeShardsResult( return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); } - private static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { - /* - Orders the active shards copies based on below comparators - 1. No store exception i.e. shard copy is readable - 2. Prefer previous primary shard - 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. + private static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { + /** + * Orders the active shards copies based on below comparators + * 1. No store exception i.e. shard copy is readable + * 2. Prefer previous primary shard + * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. */ - final Comparator comparator; // allocation preference + final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeShardState state) -> inSyncAllocationIds.contains(state.allocationId()) + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) .thenComparing(PRIMARY_FIRST_COMPARATOR) @@ -463,14 +466,14 @@ private static Comparator createActiveShardComparator(boolean ma */ private static NodesToAllocate buildNodesToAllocate( RoutingAllocation allocation, - List nodeShardStates, + List nodeShardStates, ShardRouting shardRouting, boolean forceAllocate ) { List yesNodeShards = new ArrayList<>(); List throttledNodeShards = new ArrayList<>(); List noNodeShards = new ArrayList<>(); - for (NodeShardState nodeShardState : nodeShardStates) { + for (NodeGatewayStartedShards nodeShardState : nodeShardStates) { RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; @@ -501,10 +504,10 @@ private static NodesToAllocate buildNodesToAllocate( * This class encapsulates the result of a call to {@link #buildNodeShardsResult} */ static class NodeShardsResult { - final List orderedAllocationCandidates; + final List orderedAllocationCandidates; final int allocationsFound; - NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { + NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { this.orderedAllocationCandidates = orderedAllocationCandidates; this.allocationsFound = allocationsFound; } @@ -530,93 +533,12 @@ protected static class NodesToAllocate { * by the allocator for allocating to the node that holds the shard copy. */ private static class DecidedNode { - final NodeShardState nodeShardState; + final NodeGatewayStartedShards nodeShardState; final Decision decision; - private DecidedNode(NodeShardState nodeShardState, Decision decision) { + private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) { this.nodeShardState = nodeShardState; this.decision = decision; } } - - /** - * The NodeShardState class represents the state of a node shard in a distributed system. - * It includes several key data points about the shard state, such as its allocation ID, - * whether it's a primary shard, any store exception, the replication checkpoint, and the - * DiscoveryNode it belongs to. - */ - protected static class NodeShardState { - // Allocation ID of the shard - private final String allocationId; - // Whether the shard is primary - private final boolean primary; - // Any store exception associated with the shard - private final Exception storeException; - // The replication checkpoint of the shard - private final ReplicationCheckpoint replicationCheckpoint; - // The DiscoveryNode the shard belongs to - private final DiscoveryNode node; - - /** - * Constructs a new NodeShardState with the given parameters. - * @param node The DiscoveryNode the shard belongs to. - * @param allocationId The allocation ID of the shard. - * @param primary Whether the shard is a primary shard. - * @param replicationCheckpoint The replication checkpoint of the shard. - * @param storeException Any store exception associated with the shard. - */ - public NodeShardState( - DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - Exception storeException - ) { - this.node = node; - this.allocationId = allocationId; - this.primary = primary; - this.replicationCheckpoint = replicationCheckpoint; - this.storeException = storeException; - } - - /** - * Returns the allocation ID of the shard. - * @return The allocation ID of the shard. - */ - public String allocationId() { - return this.allocationId; - } - - /** - * Returns whether the shard is a primary shard. - * @return True if the shard is a primary shard, false otherwise. - */ - public boolean primary() { - return this.primary; - } - - /** - * Returns the replication checkpoint of the shard. - * @return The replication checkpoint of the shard. - */ - public ReplicationCheckpoint replicationCheckpoint() { - return this.replicationCheckpoint; - } - - /** - * Returns any store exception associated with the shard. - * @return The store exception associated with the shard, or null if there isn't one. - */ - public Exception storeException() { - return this.storeException; - } - - /** - * Returns the DiscoveryNode the shard belongs to. - * @return The DiscoveryNode the shard belongs to. - */ - public DiscoveryNode getNode() { - return this.node; - } - } } diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index 7462062a0cd46..caab04d3fc434 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -35,16 +35,21 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.FailedShard; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.AsyncShardFetch; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PrimaryShardAllocator; +import org.opensearch.gateway.PrimaryShardBatchAllocator; import org.opensearch.gateway.ReplicaShardAllocator; +import org.opensearch.gateway.ReplicaShardBatchAllocator; +import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; import java.util.Collections; import java.util.HashMap; @@ -57,13 +62,13 @@ * A gateway allocator implementation that keeps an in memory list of started shard allocation * that are used as replies to the, normally async, fetch data requests. The in memory list * is adapted when shards are started and failed. - *

+ * * Nodes leaving and joining the cluster do not change the list of shards the class tracks but * rather serves as a filter to what is returned by fetch data. Concretely - fetch data will * only return shards that were started on nodes that are currently part of the cluster. - *

+ * * For now only primary shard related data is fetched. Replica request always get an empty response. - *

+ * * * This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do * not have all the infrastructure required to use it. @@ -98,7 +103,52 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR ) ); - return new AsyncShardFetch.FetchResult<>(shardId, foundShards, ignoreNodes); + return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>() { + { + put(shardId, ignoreNodes); + } + }); + } + }; + + PrimaryShardBatchAllocator primaryBatchShardAllocator = new PrimaryShardBatchAllocator() { + @Override + protected AsyncShardFetch.FetchResult fetchData( + Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation + ) { + Map foundShards = new HashMap<>(); + HashMap> shardsToIgnoreNodes = new HashMap<>(); + for (Map.Entry> entry : knownAllocations.entrySet()) { + String nodeId = entry.getKey(); + Map shardsOnNode = entry.getValue(); + HashMap adaptedResponse = new HashMap<>(); + + for (ShardRouting shardRouting : shardsEligibleForFetch) { + ShardId shardId = shardRouting.shardId(); + Set ignoreNodes = allocation.getIgnoreNodes(shardId); + + if (shardsOnNode.containsKey(shardId) && ignoreNodes.contains(nodeId) == false && currentNodes.nodeExists(nodeId)) { + TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard nodeShard = + new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard( + shardRouting.allocationId().getId(), + shardRouting.primary(), + getReplicationCheckpoint(shardId, nodeId) + ); + adaptedResponse.put(shardId, nodeShard); + shardsToIgnoreNodes.put(shardId, ignoreNodes); + } + foundShards.put( + currentNodes.get(nodeId), + new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch( + currentNodes.get(nodeId), + adaptedResponse + ) + ); + } + } + return new AsyncShardFetch.FetchResult<>(foundShards, shardsToIgnoreNodes); } }; @@ -111,7 +161,28 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // for now, just pretend no node has data final ShardId shardId = shard.shardId(); - return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), allocation.getIgnoreNodes(shardId)); + return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>() { + { + put(shardId, allocation.getIgnoreNodes(shardId)); + } + }); + } + + @Override + protected boolean hasInitiatedFetching(ShardRouting shard) { + return true; + } + }; + + ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() { + + @Override + protected AsyncShardFetch.FetchResult fetchData( + Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation + ) { + return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), Collections.emptyMap()); } @Override @@ -157,6 +228,12 @@ public void allocateUnassigned( innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } + @Override + public void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) { + currentNodes = allocation.nodes(); + innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary); + } + /** * manually add a specific shard to the allocations the gateway keeps track of */ @@ -171,4 +248,34 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) { shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint); } + + public Set createAndUpdateBatches(RoutingAllocation allocation, boolean primary) { + return super.createAndUpdateBatches(allocation, primary); + } + + public void safelyRemoveShardFromBatch(ShardRouting shard) { + super.safelyRemoveShardFromBatch(shard); + } + + public void safelyRemoveShardFromBothBatch(ShardRouting shardRouting) { + super.safelyRemoveShardFromBothBatch(shardRouting); + } + + public String getBatchId(ShardRouting shard, boolean primary) { + return super.getBatchId(shard, primary); + } + + public Map getBatchIdToStartedShardBatch() { + return batchIdToStartedShardBatch; + } + + public Map getBatchIdToStoreShardBatch() { + return batchIdToStoreShardBatch; + } + + @Override + public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) { + setShardAllocators(primaryShardAllocator, replicaShardAllocator, primaryBatchShardAllocator, replicaBatchShardAllocator); + return super.explainUnassignedShardAllocation(unassignedShard, routingAllocation); + } } From a2a439b02a50f9affd01d78cd715a7953ebe70e4 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 11 Dec 2023 20:06:25 +0000 Subject: [PATCH 14/18] Apply Spotless PSA Signed-off-by: Shivansh Arora --- .../gateway/PrimaryShardAllocator.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 719de2db6db95..1a92cfb31e2a1 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -51,7 +51,6 @@ import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.ArrayList; import java.util.Collection; @@ -137,13 +136,11 @@ public AllocateUnassignedDecision makeAllocationDecision( * @param shardsState {@link FetchResult} * */ private static List adaptToNodeShardStates(FetchResult shardsState) { - if (!shardsState.hasData()){ + if (!shardsState.hasData()) { return null; } List nodeShardStates = new ArrayList<>(); - shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { - nodeShardStates.add(nodeGatewayStartedShard); - }); + shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { nodeShardStates.add(nodeGatewayStartedShard); }); return nodeShardStates; } @@ -349,13 +346,13 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS (NodeGatewayStartedShards state) -> state.storeException() == null ).reversed(); private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::primary).reversed(); + NodeGatewayStartedShards::primary + ).reversed(); - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = - Comparator.comparing( - NodeGatewayStartedShards::replicationCheckpoint, - Comparator.nullsLast(Comparator.naturalOrder()) - ); + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( + NodeGatewayStartedShards::replicationCheckpoint, + Comparator.nullsLast(Comparator.naturalOrder()) + ); /** * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching @@ -437,7 +434,10 @@ protected static NodeShardsResult buildNodeShardsResult( return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); } - private static Comparator createActiveShardComparator(boolean matchAnyShard, Set inSyncAllocationIds) { + private static Comparator createActiveShardComparator( + boolean matchAnyShard, + Set inSyncAllocationIds + ) { /** * Orders the active shards copies based on below comparators * 1. No store exception i.e. shard copy is readable From d2fd6caf66dfabb416985fc60d8384b4ffc97f96 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Tue, 12 Dec 2023 01:43:12 +0530 Subject: [PATCH 15/18] reverted vcs change Signed-off-by: Shivansh Arora --- .idea/vcs.xml | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index b74d87df8f17b..48557884a8893 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,20 @@ - - - + + + - + From 8b328feaab1bbb03da78eab1b3d499e76dbaa274 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Tue, 12 Dec 2023 01:44:45 +0530 Subject: [PATCH 16/18] Reverted TestGatewayAllocator change Signed-off-by: Shivansh Arora --- .../test/gateway/TestGatewayAllocator.java | 117 +----------------- 1 file changed, 5 insertions(+), 112 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java index caab04d3fc434..7462062a0cd46 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestGatewayAllocator.java @@ -35,21 +35,16 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.FailedShard; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.AsyncShardFetch; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PrimaryShardAllocator; -import org.opensearch.gateway.PrimaryShardBatchAllocator; import org.opensearch.gateway.ReplicaShardAllocator; -import org.opensearch.gateway.ReplicaShardBatchAllocator; -import org.opensearch.gateway.TransportNodesListGatewayStartedBatchShards; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; import java.util.Collections; import java.util.HashMap; @@ -62,13 +57,13 @@ * A gateway allocator implementation that keeps an in memory list of started shard allocation * that are used as replies to the, normally async, fetch data requests. The in memory list * is adapted when shards are started and failed. - * + *

* Nodes leaving and joining the cluster do not change the list of shards the class tracks but * rather serves as a filter to what is returned by fetch data. Concretely - fetch data will * only return shards that were started on nodes that are currently part of the cluster. - * + *

* For now only primary shard related data is fetched. Replica request always get an empty response. - * + *

* * This class is useful to use in unit tests that require the functionality of {@link GatewayAllocator} but do * not have all the infrastructure required to use it. @@ -103,52 +98,7 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR ) ); - return new AsyncShardFetch.FetchResult<>(foundShards, new HashMap<>() { - { - put(shardId, ignoreNodes); - } - }); - } - }; - - PrimaryShardBatchAllocator primaryBatchShardAllocator = new PrimaryShardBatchAllocator() { - @Override - protected AsyncShardFetch.FetchResult fetchData( - Set shardsEligibleForFetch, - Set inEligibleShards, - RoutingAllocation allocation - ) { - Map foundShards = new HashMap<>(); - HashMap> shardsToIgnoreNodes = new HashMap<>(); - for (Map.Entry> entry : knownAllocations.entrySet()) { - String nodeId = entry.getKey(); - Map shardsOnNode = entry.getValue(); - HashMap adaptedResponse = new HashMap<>(); - - for (ShardRouting shardRouting : shardsEligibleForFetch) { - ShardId shardId = shardRouting.shardId(); - Set ignoreNodes = allocation.getIgnoreNodes(shardId); - - if (shardsOnNode.containsKey(shardId) && ignoreNodes.contains(nodeId) == false && currentNodes.nodeExists(nodeId)) { - TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard nodeShard = - new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShard( - shardRouting.allocationId().getId(), - shardRouting.primary(), - getReplicationCheckpoint(shardId, nodeId) - ); - adaptedResponse.put(shardId, nodeShard); - shardsToIgnoreNodes.put(shardId, ignoreNodes); - } - foundShards.put( - currentNodes.get(nodeId), - new TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch( - currentNodes.get(nodeId), - adaptedResponse - ) - ); - } - } - return new AsyncShardFetch.FetchResult<>(foundShards, shardsToIgnoreNodes); + return new AsyncShardFetch.FetchResult<>(shardId, foundShards, ignoreNodes); } }; @@ -161,28 +111,7 @@ private ReplicationCheckpoint getReplicationCheckpoint(ShardId shardId, String n protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // for now, just pretend no node has data final ShardId shardId = shard.shardId(); - return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), new HashMap<>() { - { - put(shardId, allocation.getIgnoreNodes(shardId)); - } - }); - } - - @Override - protected boolean hasInitiatedFetching(ShardRouting shard) { - return true; - } - }; - - ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() { - - @Override - protected AsyncShardFetch.FetchResult fetchData( - Set shardsEligibleForFetch, - Set inEligibleShards, - RoutingAllocation allocation - ) { - return new AsyncShardFetch.FetchResult<>(Collections.emptyMap(), Collections.emptyMap()); + return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), allocation.getIgnoreNodes(shardId)); } @Override @@ -228,12 +157,6 @@ public void allocateUnassigned( innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } - @Override - public void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) { - currentNodes = allocation.nodes(); - innerAllocateUnassignedBatch(allocation, primaryBatchShardAllocator, replicaBatchShardAllocator, primary); - } - /** * manually add a specific shard to the allocations the gateway keeps track of */ @@ -248,34 +171,4 @@ public String getReplicationCheckPointKey(ShardId shardId, String nodeName) { public void addReplicationCheckpoint(ShardId shardId, String nodeName, ReplicationCheckpoint replicationCheckpoint) { shardIdNodeToReplicationCheckPointMap.putIfAbsent(getReplicationCheckPointKey(shardId, nodeName), replicationCheckpoint); } - - public Set createAndUpdateBatches(RoutingAllocation allocation, boolean primary) { - return super.createAndUpdateBatches(allocation, primary); - } - - public void safelyRemoveShardFromBatch(ShardRouting shard) { - super.safelyRemoveShardFromBatch(shard); - } - - public void safelyRemoveShardFromBothBatch(ShardRouting shardRouting) { - super.safelyRemoveShardFromBothBatch(shardRouting); - } - - public String getBatchId(ShardRouting shard, boolean primary) { - return super.getBatchId(shard, primary); - } - - public Map getBatchIdToStartedShardBatch() { - return batchIdToStartedShardBatch; - } - - public Map getBatchIdToStoreShardBatch() { - return batchIdToStoreShardBatch; - } - - @Override - public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) { - setShardAllocators(primaryShardAllocator, replicaShardAllocator, primaryBatchShardAllocator, replicaBatchShardAllocator); - return super.explainUnassignedShardAllocation(unassignedShard, routingAllocation); - } } From 614009bff1bb9edb0da40b5afc252d13b7e475a6 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 5 Feb 2024 01:00:47 +0530 Subject: [PATCH 17/18] Address review comments Signed-off-by: Shivansh Arora --- .../org/opensearch/gateway/PrimaryShardAllocator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 1a92cfb31e2a1..1b1e0a6c21ef3 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -125,7 +125,7 @@ public AllocateUnassignedDecision makeAllocationDecision( return decision; } final FetchResult shardState = fetchData(unassignedShard, allocation); - List nodeShardStates = adaptToNodeShardStates(shardState); + List nodeShardStates = adaptToNodeStartedShardList(shardState); return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); } @@ -135,7 +135,7 @@ public AllocateUnassignedDecision makeAllocationDecision( * * @param shardsState {@link FetchResult} * */ - private static List adaptToNodeShardStates(FetchResult shardsState) { + private static List adaptToNodeStartedShardList(FetchResult shardsState) { if (!shardsState.hasData()) { return null; } @@ -150,15 +150,15 @@ protected AllocateUnassignedDecision getAllocationDecision( List shardState, Logger logger ) { + final boolean explain = allocation.debugDecision(); if (shardState == null) { allocation.setHasPendingAsyncFetch(); List nodeDecisions = null; - if (allocation.debugDecision()) { + if (explain) { nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } - final boolean explain = allocation.debugDecision(); // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); From a6fb7fcf84a703e5c11ec70904dd7737da90fc3b Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 5 Feb 2024 14:38:48 +0530 Subject: [PATCH 18/18] Fix javadoc Signed-off-by: Shivansh Arora --- .../java/org/opensearch/gateway/PrimaryShardAllocator.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 1b1e0a6c21ef3..5046873830c01 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -132,9 +132,7 @@ public AllocateUnassignedDecision makeAllocationDecision( /** * Transforms {@link FetchResult} of {@link NodeGatewayStartedShards} to {@link List} of {@link NodeGatewayStartedShards} * Returns null if {@link FetchResult} does not have any data. - * - * @param shardsState {@link FetchResult} - * */ + */ private static List adaptToNodeStartedShardList(FetchResult shardsState) { if (!shardsState.hasData()) { return null;