From 9914d70c95ba55ae7338e25150f3ac7447b47dc1 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 4 Sep 2023 05:23:54 +0530 Subject: [PATCH] Add PrimaryShardBatchAllocator to take allocation decisions for a batch of shards Signed-off-by: Shivansh Arora --- .../gateway/BaseGatewayShardAllocator.java | 87 +++++++- .../gateway/PrimaryShardAllocator.java | 160 +++++++++------ .../gateway/PrimaryShardBatchAllocator.java | 186 ++++++++++++++++++ 3 files changed, 366 insertions(+), 67 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 59ef894958cbe..1b943db825b4f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -34,8 +34,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationDecision; @@ -43,14 +45,22 @@ import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; /** * An abstract class that implements basic functionality for allocating * shards to nodes based on shard copies that already exist in the cluster. - * + *

* Individual implementations of this class are responsible for providing * the logic to determine to which nodes (if any) those shards are allocated. * @@ -64,8 +74,9 @@ public abstract class BaseGatewayShardAllocator { * Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist. * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)} * to make decisions on assigning shards to nodes. - * @param shardRouting the shard to allocate - * @param allocation the allocation state container object + * + * @param shardRouting the shard to allocate + * @param allocation the allocation state container object * @param unassignedAllocationHandler handles the allocation of the current shard */ public void allocateUnassigned( @@ -109,9 +120,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions * about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated. * - * @param unassignedShard the unassigned shard to allocate - * @param allocation the current routing state - * @param logger the logger + * @param unassignedShard the unassigned shard to allocate + * @param allocation the current routing state + * @param logger the logger * @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision */ public abstract AllocateUnassignedDecision makeAllocationDecision( @@ -132,4 +143,68 @@ protected static List buildDecisionsForAllNodes(ShardRouti } return results; } + + protected static class NodeShardState { + private final String allocationId; + private final boolean primary; + private final Exception storeException; + private final ReplicationCheckpoint replicationCheckpoint; + private final DiscoveryNode node; + + public NodeShardState(DiscoveryNode node, + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException) { + this.node = node; + this.allocationId = allocationId; + this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; + this.storeException = storeException; + } + + public String allocationId() { + return this.allocationId; + } + + public boolean primary() { + return this.primary; + } + + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + + public Exception storeException() { + return this.storeException; + } + + public DiscoveryNode getNode() { + return this.node; + } + } + + protected static class NodeShardStates { + TreeMap nodeShardStates; + + public NodeShardStates(Comparator comparator) { + this.nodeShardStates = new TreeMap<>(comparator); + } + + public void add(NodeShardState key, DiscoveryNode value) { + this.nodeShardStates.put(key, value); + } + + public DiscoveryNode get(NodeShardState key) { + return this.nodeShardStates.get(key); + } + + public int size() { + return this.nodeShardStates.size(); + } + + public Iterator iterator() { + return this.nodeShardStates.keySet().iterator(); + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java index 4dc9396751fc9..c8783e39049d0 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java @@ -51,14 +51,18 @@ import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.gateway.AsyncShardFetch.FetchResult; import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -81,7 +85,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator { /** * Is the allocator responsible for allocating the given {@link ShardRouting}? */ - private static boolean isResponsibleFor(final ShardRouting shard) { + protected static boolean isResponsibleFor(final ShardRouting shard) { return shard.primary() // must be primary && shard.unassigned() // must be unassigned // only handle either an existing store or a snapshot recovery @@ -89,19 +93,20 @@ private static boolean isResponsibleFor(final ShardRouting shard) { || shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT); } - @Override - public AllocateUnassignedDecision makeAllocationDecision( - final ShardRouting unassignedShard, - final RoutingAllocation allocation, - final Logger logger - ) { + /** + * Skip doing fetchData call for a shard if recovery mode is snapshot. Also do not take decision if allocator is + * not responsible for this particular shard. + * + * @param unassignedShard unassigned shard routing + * @param allocation routing allocation object + * @return allocation decision taken for this shard + */ + protected AllocateUnassignedDecision skipSnapshotRestore(ShardRouting unassignedShard, RoutingAllocation allocation) { if (isResponsibleFor(unassignedShard) == false) { // this allocator is not responsible for allocating this shard return AllocateUnassignedDecision.NOT_TAKEN; } - final boolean explain = allocation.debugDecision(); - if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && allocation.snapshotShardSizeInfo().getShardSize(unassignedShard) == null) { List nodeDecisions = null; @@ -110,17 +115,43 @@ public AllocateUnassignedDecision makeAllocationDecision( } return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } + return null; + } + @Override + public AllocateUnassignedDecision makeAllocationDecision( + final ShardRouting unassignedShard, + final RoutingAllocation allocation, + final Logger logger + ) { + AllocateUnassignedDecision decision = skipSnapshotRestore(unassignedShard, allocation); + if (decision != null) { + return decision; + } final FetchResult shardState = fetchData(unassignedShard, allocation); if (shardState.hasData() == false) { allocation.setHasPendingAsyncFetch(); List nodeDecisions = null; - if (explain) { + if (allocation.debugDecision()) { nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); } return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions); } + NodeShardStates nodeShardStates = getNodeShardStates(shardState); + return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger); + } + private static NodeShardStates getNodeShardStates(FetchResult shardsState) { + NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1); + shardsState.getData().forEach((node, nodeGatewayStartedShard) -> { + nodeShardStates.add(new NodeShardState(node, nodeGatewayStartedShard.allocationId(), nodeGatewayStartedShard.primary(), nodeGatewayStartedShard.replicationCheckpoint(), nodeGatewayStartedShard.storeException()), node); + }); + return nodeShardStates; + } + + protected AllocateUnassignedDecision getAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, + NodeShardStates shardState, Logger logger) { + final boolean explain = allocation.debugDecision(); // don't create a new IndexSetting object for every shard as this could cause a lot of garbage // on cluster restart if we allocate a boat load of shards final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index()); @@ -185,22 +216,23 @@ public AllocateUnassignedDecision makeAllocationDecision( boolean throttled = false; if (nodesToAllocate.yesNodeShards.isEmpty() == false) { DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); + NodeShardState nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on primary allocation", unassignedShard.index(), unassignedShard.id(), unassignedShard, - decidedNode.nodeShardState.getNode() + nodeShardState.getNode() ); - node = decidedNode.nodeShardState.getNode(); - allocationId = decidedNode.nodeShardState.allocationId(); + node = nodeShardState.getNode(); + allocationId = nodeShardState.allocationId(); } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) { // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard // can be force-allocated to one of the nodes. nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true); if (nodesToAllocate.yesNodeShards.isEmpty() == false) { final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0); - final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState; + final NodeShardState nodeShardState = decidedNode.nodeShardState; logger.debug( "[{}][{}]: allocating [{}] to [{}] on forced primary allocation", unassignedShard.index(), @@ -260,11 +292,11 @@ public AllocateUnassignedDecision makeAllocationDecision( */ private static List buildNodeDecisions( NodesToAllocate nodesToAllocate, - FetchResult fetchedShardData, + NodeShardStates fetchedShardData, Set inSyncAllocationIds ) { List nodeResults = new ArrayList<>(); - Collection ineligibleShards; + Collection ineligibleShards = new ArrayList<>(); if (nodesToAllocate != null) { final Set discoNodes = new HashSet<>(); nodeResults.addAll( @@ -280,15 +312,15 @@ private static List buildNodeDecisions( }) .collect(Collectors.toList()) ); - ineligibleShards = fetchedShardData.getData() - .values() - .stream() - .filter(shardData -> discoNodes.contains(shardData.getNode()) == false) - .collect(Collectors.toList()); + fetchedShardData.iterator().forEachRemaining(shardData -> { + if (discoNodes.contains(shardData.getNode()) == false) { + ineligibleShards.add(shardData); + } + }); } else { // there were no shard copies that were eligible for being assigned the allocation, // so all fetched shard data are ineligible shards - ineligibleShards = fetchedShardData.getData().values(); + fetchedShardData.iterator().forEachRemaining(ineligibleShards::add); } nodeResults.addAll( @@ -300,21 +332,21 @@ private static List buildNodeDecisions( return nodeResults; } - private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardState, Set inSyncAllocationIds) { + protected static ShardStoreInfo shardStoreInfo(NodeShardState nodeShardState, Set inSyncAllocationIds) { final Exception storeErr = nodeShardState.storeException(); final boolean inSync = nodeShardState.allocationId() != null && inSyncAllocationIds.contains(nodeShardState.allocationId()); return new ShardStoreInfo(nodeShardState.allocationId(), inSync, storeErr); } - private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( - (NodeGatewayStartedShards state) -> state.storeException() == null + private static final Comparator NO_STORE_EXCEPTION_FIRST_COMPARATOR = Comparator.comparing( + (NodeShardState state) -> state.storeException() == null ).reversed(); - private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::primary + private static final Comparator PRIMARY_FIRST_COMPARATOR = Comparator.comparing( + NodeShardState::primary ).reversed(); - private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( - NodeGatewayStartedShards::replicationCheckpoint, + private static final Comparator HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing( + NodeShardState::replicationCheckpoint, Comparator.nullsLast(Comparator.naturalOrder()) ); @@ -323,17 +355,19 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS * inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but * entries with matching allocation id are always at the front of the list. */ - protected static NodeShardsResult buildNodeShardsResult( + protected NodeShardsResult buildNodeShardsResult( ShardRouting shard, boolean matchAnyShard, Set ignoreNodes, Set inSyncAllocationIds, - FetchResult shardState, + NodeShardStates shardState, Logger logger ) { - List nodeShardStates = new ArrayList<>(); + NodeShardStates nodeShardStates = new NodeShardStates(getComparator(matchAnyShard, inSyncAllocationIds)); int numberOfAllocationsFound = 0; - for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) { + Iterator iterator = shardState.iterator(); + while (iterator.hasNext()) { + NodeShardState nodeShardState = iterator.next(); DiscoveryNode node = nodeShardState.getNode(); String allocationId = nodeShardState.allocationId(); @@ -381,22 +415,33 @@ protected static NodeShardsResult buildNodeShardsResult( + nodeShardState.storeException(); numberOfAllocationsFound++; if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) { - nodeShardStates.add(nodeShardState); + nodeShardStates.add(nodeShardState, nodeShardState.getNode()); } } } + if (logger.isTraceEnabled()) { + logger.trace( + "{} candidates for allocation: {}", + shard, + nodeShardStates.nodeShardStates.values().stream().map(DiscoveryNode::getName).collect(Collectors.joining(", ")) + ); + } + return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); + } + + protected static Comparator getComparator(boolean matchAnyShard, Set inSyncAllocationIds) { /** * Orders the active shards copies based on below comparators * 1. No store exception i.e. shard copy is readable * 2. Prefer previous primary shard * 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices. */ - final Comparator comparator; // allocation preference + final Comparator comparator; // allocation preference if (matchAnyShard) { // prefer shards with matching allocation ids - Comparator matchingAllocationsFirst = Comparator.comparing( - (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId()) + Comparator matchingAllocationsFirst = Comparator.comparing( + (NodeShardState state) -> inSyncAllocationIds.contains(state.allocationId()) ).reversed(); comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR) .thenComparing(PRIMARY_FIRST_COMPARATOR) @@ -406,31 +451,24 @@ protected static NodeShardsResult buildNodeShardsResult( .thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR); } - nodeShardStates.sort(comparator); - - if (logger.isTraceEnabled()) { - logger.trace( - "{} candidates for allocation: {}", - shard, - nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")) - ); - } - return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound); + return comparator; } /** * Split the list of node shard states into groups yes/no/throttle based on allocation deciders */ - private static NodesToAllocate buildNodesToAllocate( + protected static NodesToAllocate buildNodesToAllocate( RoutingAllocation allocation, - List nodeShardStates, + NodeShardStates nodeShardStates, ShardRouting shardRouting, boolean forceAllocate ) { List yesNodeShards = new ArrayList<>(); List throttledNodeShards = new ArrayList<>(); List noNodeShards = new ArrayList<>(); - for (NodeGatewayStartedShards nodeShardState : nodeShardStates) { + Iterator iterator = nodeShardStates.iterator(); + while (iterator.hasNext()) { + NodeShardState nodeShardState = iterator.next(); RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId()); if (node == null) { continue; @@ -457,22 +495,22 @@ private static NodesToAllocate buildNodesToAllocate( protected abstract FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); - private static class NodeShardsResult { - final List orderedAllocationCandidates; + protected static class NodeShardsResult { + final NodeShardStates orderedAllocationCandidates; final int allocationsFound; - NodeShardsResult(List orderedAllocationCandidates, int allocationsFound) { + NodeShardsResult(NodeShardStates orderedAllocationCandidates, int allocationsFound) { this.orderedAllocationCandidates = orderedAllocationCandidates; this.allocationsFound = allocationsFound; } } - static class NodesToAllocate { - final List yesNodeShards; - final List throttleNodeShards; - final List noNodeShards; + protected static class NodesToAllocate { + final List yesNodeShards; + final List throttleNodeShards; + final List noNodeShards; - NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { + NodesToAllocate(List yesNodeShards, List throttleNodeShards, List noNodeShards) { this.yesNodeShards = yesNodeShards; this.throttleNodeShards = throttleNodeShards; this.noNodeShards = noNodeShards; @@ -483,11 +521,11 @@ static class NodesToAllocate { * This class encapsulates the shard state retrieved from a node and the decision that was made * by the allocator for allocating to the node that holds the shard copy. */ - private static class DecidedNode { - final NodeGatewayStartedShards nodeShardState; + protected static class DecidedNode { + final NodeShardState nodeShardState; final Decision decision; - private DecidedNode(NodeGatewayStartedShards nodeShardState, Decision decision) { + protected DecidedNode(NodeShardState nodeShardState, Decision decision) { this.nodeShardState = nodeShardState; this.decision = decision; } diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java new file mode 100644 index 0000000000000..803e8dae2ef1a --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -0,0 +1,186 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.NodeAllocationResult; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.env.ShardLockObtainFailedException; +import org.opensearch.gateway.AsyncShardFetch.FetchResult; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is + * that it can allocate multiple unassigned primary shards wherein PrimaryShardAllocator can only allocate single + * unassigned shard. + * The primary shard batch allocator allocates multiple unassigned primary shards to nodes that hold + * valid copies of the unassigned primaries. It does this by iterating over all unassigned + * primary shards in the routing table and fetching shard metadata from each node in the cluster + * that holds a copy of the shard. The shard metadata from each node is compared against the + * set of valid allocation IDs and for all valid shard copies (if any), the primary shard batch allocator + * executes the allocation deciders to chose a copy to assign the primary shard to. + *

+ * Note that the PrimaryShardBatchAllocator does *not* allocate primaries on index creation + * (see {@link org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}), + * nor does it allocate primaries when a primary shard failed and there is a valid replica + * copy that can immediately be promoted to primary, as this takes place in {@link RoutingNodes#failShard}. + * + * @opensearch.internal + */ +public abstract class PrimaryShardBatchAllocator extends PrimaryShardAllocator { + + abstract protected FetchResult fetchData(Set shardsEligibleForFetch, + Set inEligibleShards, + RoutingAllocation allocation); + + protected FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation){ + return null; + } + + @Override + public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, + RoutingAllocation allocation, + Logger logger) { + + return makeAllocationDecision(new HashSet<>(Collections.singletonList(unassignedShard)), + allocation, logger).get(unassignedShard); + } + + /** + * Build allocation decisions for all the shards present in the batch identified by batchId. + * + * @param shards set of shards given for allocation + * @param allocation current allocation of all the shards + * @param logger logger used for logging + * @return shard to allocation decision map + */ + @Override + public HashMap makeAllocationDecision(Set shards, + RoutingAllocation allocation, + Logger logger) { + HashMap shardAllocationDecisions = new HashMap<>(); + final boolean explain = allocation.debugDecision(); + Set shardsEligibleForFetch = new HashSet<>(); + Set shardsNotEligibleForFetch = new HashSet<>(); + // identify ineligible shards + for (ShardRouting shard : shards) { + ShardRouting matchingShard = null; + for (RoutingNode node: allocation.routingNodes()) { + matchingShard = node.getByShardId(shard.shardId()); + if (matchingShard != null && matchingShard.primary() == shard.primary()) { + // we have a matching shard on this node, so this is a valid copy + break; + } + } + if (matchingShard == null) { + matchingShard = shard; + } + AllocateUnassignedDecision decision = skipSnapshotRestore(matchingShard, allocation); + if (decision != null) { + shardsNotEligibleForFetch.add(shard); + shardAllocationDecisions.put(shard, decision); + } else { + shardsEligibleForFetch.add(shard); + } + } + // Do not call fetchData if there are no eligible shards + if (shardsEligibleForFetch.size() == 0) { + return shardAllocationDecisions; + } + // only fetch data for eligible shards + final FetchResult shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation); + // Note : shardsState contain the Data, there key is DiscoveryNode but value is Map so to get one shard level data (from all the nodes), we'll traverse the map + // and construct the nodeShardState along the way before making any allocation decision. As metadata for a + // particular shard is needed from all the discovery nodes. + + // process the received data + for (ShardRouting unassignedShard : shardsEligibleForFetch) { + if (shardsState.hasData() == false) { + // if fetching is not done, add that no decision in the resultant map + allocation.setHasPendingAsyncFetch(); + List nodeDecisions = null; + if (explain) { + nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); + } + shardAllocationDecisions.put(unassignedShard, + AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, + nodeDecisions)); + } else { + + NodeShardStates nodeShardStates = getNodeShardStates(unassignedShard, shardsState); + // get allocation decision for this shard + shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, + nodeShardStates, logger)); + } + } + return shardAllocationDecisions; + } + + private static NodeShardStates getNodeShardStates(ShardRouting unassignedShard, FetchResult shardsState) { + NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1); + Map nodeResponses = shardsState.getData(); + + // build data for a shard from all the nodes + nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> { + NodeGatewayStartedShards shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId()); + nodeShardStates.add(new NodeShardState(node, shardData.allocationId(), shardData.primary(), shardData.replicationCheckpoint(), shardData.storeException()), node); + }); + return nodeShardStates; + } +}