Skip to content

Commit

Permalink
Use registry for PersistedState
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Aug 25, 2023
1 parent a317f2f commit 91c616f
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,7 @@ public Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNo
publishResponse.getVersion(),
publishResponse.getTerm()
);
if (isRemoteStateEnabled == true) {
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.markLastAcceptedStateAsCommitted();
}
handleRemoteCommit();
return Optional.of(new ApplyCommitRequest(localNode, publishResponse.getTerm(), publishResponse.getVersion()));
}

Expand Down Expand Up @@ -564,15 +561,17 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
}

public void handleRemotePublish(ClusterState clusterState) {
if (clusterState.nodes().isLocalNodeElectedClusterManager()
&& isRemoteStateEnabled == true) {
if (isRemoteStateEnabled == true) {
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.setLastAcceptedState(clusterState);
}
}

public void handleRemoteCommit() {

if (isRemoteStateEnabled) {
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.markLastAcceptedStateAsCommitted();
}
}

public void invariant() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.coordination.CoordinationState.VoteCollection;
import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.opensearch.cluster.coordination.PersistentStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -149,7 +150,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
private final Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier;
private final NoClusterManagerBlockService noClusterManagerBlockService;
final Object mutex = new Object(); // package-private to allow tests to call methods that assert that the mutex is held
private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
Expand Down Expand Up @@ -203,8 +203,7 @@ public Coordinator(
Random random,
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier
NodeHealthService nodeHealthService
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -229,7 +228,6 @@ public Coordinator(
namedWriteableRegistry
);
this.persistedStateSupplier = persistedStateSupplier;
this.remotePersistedStateSupplier = remotePersistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
this.lastJoin = Optional.empty();
Expand Down Expand Up @@ -825,7 +823,8 @@ boolean publicationInProgress() {
protected void doStart() {
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get(), settings));
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy, PersistentStateRegistry.getPersistedState(
PersistedStateType.REMOTE), settings));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down Expand Up @@ -1765,8 +1764,7 @@ protected boolean isPublishQuorum(VoteCollection votes) {
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert getCurrentTerm() >= publishResponse.getTerm();
return coordinationState.get()
.handlePublishResponse(sourceNode, publishResponse);
return coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import java.util.HashMap;
import java.util.Map;
import org.opensearch.cluster.coordination.CoordinationState.PersistedState;

public class PersistentStateRegistry {

private static final PersistentStateRegistry INSTANCE = new PersistentStateRegistry();

private PersistentStateRegistry() {
}

public enum PersistedStateType {
LOCAL, REMOTE
}

private final Map<PersistedStateType, PersistedState> persistentStates = new HashMap<>();

public static void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) {
PersistedState existingState = INSTANCE.persistentStates.putIfAbsent(persistedStateType, persistedState);
assert existingState == null : "should only be set once, but already have " + existingState;
}

public static PersistedState getPersistedState(PersistedStateType persistedStateType) {
return INSTANCE.persistentStates.get(persistedStateType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ public DiscoveryModule(
new Random(Randomness.get().nextLong()),
rerouteService,
electionStrategy,
nodeHealthService,
gatewayMetaState::getRemotePersistedState
nodeHealthService
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
28 changes: 14 additions & 14 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationState.PersistedState;
import org.opensearch.cluster.coordination.InMemoryPersistedState;
import org.opensearch.cluster.coordination.PersistentStateRegistry;
import org.opensearch.cluster.coordination.PersistentStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Manifest;
Expand Down Expand Up @@ -104,22 +106,15 @@ public class GatewayMetaState implements Closeable {

// Set by calling start()
private final SetOnce<PersistedState> persistedState = new SetOnce<>();
private final SetOnce<PersistedState> remotePersistedState = new SetOnce<>();

public PersistedState getPersistedState() {
final PersistedState persistedState = this.persistedState.get();
assert persistedState != null : "not started";
return persistedState;
}

public PersistedState getRemotePersistedState() {
final PersistedState persistedState = this.remotePersistedState.get();
final PersistedState persistedState = PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL);
assert persistedState != null : "not started";
return persistedState;
}

public Metadata getMetadata() {
return getPersistedState().getLastAcceptedState().metadata();
return PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata();
}

public void start(
Expand All @@ -132,7 +127,7 @@ public void start(
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService
) {
assert persistedState.get() == null : "should only start once, but already have " + persistedState.get();
assert PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL) == null : "should only start once, but already have " + persistedState.get();

if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.isDataNode(settings)) {
try {
Expand Down Expand Up @@ -193,8 +188,8 @@ public void start(
}
}

this.persistedState.set(persistedState);
this.remotePersistedState.set(remotePersistedState);
PersistentStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
PersistentStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState);
} catch (IOException e) {
throw new OpenSearchException("failed to load metadata", e);
}
Expand Down Expand Up @@ -340,12 +335,12 @@ public void applyClusterState(ClusterChangedEvent event) {

@Override
public void close() throws IOException {
IOUtils.close(persistedState.get());
IOUtils.close(PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL));
}

// visible for testing
public boolean allPendingAsyncStatesWritten() {
final PersistedState ps = persistedState.get();
final PersistedState ps = PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL);
if (ps instanceof AsyncLucenePersistedState) {
return ((AsyncLucenePersistedState) ps).allPendingAsyncStatesWritten();
} else {
Expand Down Expand Up @@ -677,6 +672,11 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState) {
@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null || lastAcceptedMarker == null || lastAcceptedState.blocks()
.hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
return;
}
final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedMarker
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.coordination.PersistentStateRegistry;
import org.opensearch.cluster.coordination.PersistentStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.AliasValidator;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -1328,7 +1330,7 @@ public Node start() throws NodeValidationException {
}
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
final Metadata onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metadata();
final Metadata onDiskMetadata = PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata();
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
validateNodeBeforeAcceptingRequests(
new BootstrapContext(environment, onDiskMetadata),
Expand Down

0 comments on commit 91c616f

Please sign in to comment.