Skip to content

Commit

Permalink
Add PrimaryShardBatchAllocator to take allocation decisions for a bat…
Browse files Browse the repository at this point in the history
…ch of shards

Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Sep 5, 2023
1 parent 0c839c3 commit 9914d70
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,33 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
* shards to nodes based on shard copies that already exist in the cluster.
*
* <p>
* Individual implementations of this class are responsible for providing
* the logic to determine to which nodes (if any) those shards are allocated.
*
Expand All @@ -64,8 +74,9 @@ public abstract class BaseGatewayShardAllocator {
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
* to make decisions on assigning shards to nodes.
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
*
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
* @param unassignedAllocationHandler handles the allocation of the current shard
*/
public void allocateUnassigned(
Expand Down Expand Up @@ -109,9 +120,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
*
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision
*/
public abstract AllocateUnassignedDecision makeAllocationDecision(
Expand All @@ -132,4 +143,68 @@ protected static List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouti
}
return results;
}

protected static class NodeShardState {
private final String allocationId;
private final boolean primary;
private final Exception storeException;
private final ReplicationCheckpoint replicationCheckpoint;
private final DiscoveryNode node;

public NodeShardState(DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint,
Exception storeException) {
this.node = node;
this.allocationId = allocationId;
this.primary = primary;
this.replicationCheckpoint = replicationCheckpoint;
this.storeException = storeException;
}

public String allocationId() {
return this.allocationId;
}

public boolean primary() {
return this.primary;
}

public ReplicationCheckpoint replicationCheckpoint() {
return this.replicationCheckpoint;
}

public Exception storeException() {
return this.storeException;
}

public DiscoveryNode getNode() {
return this.node;
}
}

protected static class NodeShardStates {
TreeMap<NodeShardState, DiscoveryNode> nodeShardStates;

public NodeShardStates(Comparator<NodeShardState> comparator) {
this.nodeShardStates = new TreeMap<>(comparator);
}

public void add(NodeShardState key, DiscoveryNode value) {
this.nodeShardStates.put(key, value);
}

public DiscoveryNode get(NodeShardState key) {
return this.nodeShardStates.get(key);
}

public int size() {
return this.nodeShardStates.size();
}

public Iterator<NodeShardState> iterator() {
return this.nodeShardStates.keySet().iterator();
}
}
}
Loading

0 comments on commit 9914d70

Please sign in to comment.