Skip to content

Commit

Permalink
Remove persistedState from CoordinationState
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Aug 28, 2023
1 parent a942cd1 commit 1bd3276
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNo
publishResponse.getVersion(),
publishResponse.getTerm()
);
handleRemoteCommit();
handlePreCommit();
return Optional.of(new ApplyCommitRequest(localNode, publishResponse.getTerm(), publishResponse.getVersion()));
}

Expand Down Expand Up @@ -565,14 +565,16 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

public void handleRemotePublish(ClusterState clusterState) {
public void handlePrePublish(ClusterState clusterState) {
// Publishing the current state to remote store
if (isRemoteStateEnabled == true) {
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.setLastAcceptedState(clusterState);
}
}

public void handleRemoteCommit() {
public void handlePreCommit() {
// Publishing the committed state to remote store before sending apply commit to other nodes.
if (isRemoteStateEnabled) {
assert remotePersistedState != null : "Remote state has not been initialized";
remotePersistedState.markLastAcceptedStateAsCommitted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.opensearch.cluster.coordination.CoordinationState.VoteCollection;
import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.opensearch.cluster.coordination.PersistentStateRegistry.PersistedStateType;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -827,7 +827,7 @@ protected void doStart() {
getLocalNode(),
persistedState,
electionStrategy,
PersistentStateRegistry.getPersistedState(PersistedStateType.REMOTE),
PersistedStateRegistry.getPersistedState(PersistedStateType.REMOTE),
settings
)
);
Expand Down Expand Up @@ -1317,7 +1317,11 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);
coordinationState.get().handleRemotePublish(clusterState);
// Publishing the current state to remote store before sending the cluster state to other nodes.
// This is to ensure the remote store is the single source of truth for current state. Even if the current node
// goes down after sending the cluster state to other nodes, we should be able to read the remote state and
// recover the cluster.
coordinationState.get().handlePrePublish(clusterState);
publication.start(followersChecker.getFaultyNodes());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@

import org.opensearch.cluster.coordination.CoordinationState.PersistedState;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* A class which encapsulates the PersistedStates
*
* @opensearch.internal
*/
public class PersistentStateRegistry {
public class PersistedStateRegistry {

private static final PersistentStateRegistry INSTANCE = new PersistentStateRegistry();
private static final PersistedStateRegistry INSTANCE = new PersistedStateRegistry();

private PersistentStateRegistry() {}
private PersistedStateRegistry() {}

/**
* Distinct Types PersistedState which can be present on a node
Expand All @@ -32,15 +32,15 @@ public enum PersistedStateType {
REMOTE
}

private final Map<PersistedStateType, PersistedState> persistentStates = new HashMap<>();
private final Map<PersistedStateType, PersistedState> persistedStates = new ConcurrentHashMap<>();

public static void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) {
PersistedState existingState = INSTANCE.persistentStates.putIfAbsent(persistedStateType, persistedState);
PersistedState existingState = INSTANCE.persistedStates.putIfAbsent(persistedStateType, persistedState);
assert existingState == null : "should only be set once, but already have " + existingState;
}

public static PersistedState getPersistedState(PersistedStateType persistedStateType) {
return INSTANCE.persistentStates.get(persistedStateType);
return INSTANCE.persistedStates.get(persistedStateType);
}

}
34 changes: 18 additions & 16 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,15 @@
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationState.PersistedState;
import org.opensearch.cluster.coordination.InMemoryPersistedState;
import org.opensearch.cluster.coordination.PersistentStateRegistry;
import org.opensearch.cluster.coordination.PersistentStateRegistry.PersistedStateType;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Manifest;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.MetadataIndexUpgradeService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -84,6 +83,7 @@
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;

/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
Expand All @@ -104,17 +104,14 @@ public class GatewayMetaState implements Closeable {
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

// Set by calling start()
private final SetOnce<PersistedState> persistedState = new SetOnce<>();

public PersistedState getPersistedState() {
final PersistedState persistedState = PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL);
final PersistedState persistedState = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
assert persistedState != null : "not started";
return persistedState;
}

public Metadata getMetadata() {
return PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata();
return PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata();
}

public void start(
Expand All @@ -127,8 +124,8 @@ public void start(
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService
) {
assert PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL) == null : "should only start once, but already have "
+ persistedState.get();
assert PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) == null : "should only start once, but already have "
+ PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);

if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.isDataNode(settings)) {
try {
Expand Down Expand Up @@ -164,7 +161,9 @@ public void start(

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
if (REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
} else {
persistedState = new AsyncLucenePersistedState(
settings,
Expand All @@ -186,11 +185,14 @@ public void start(
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(persistedState);
IOUtils.closeWhileHandlingException(remotePersistedState);
}
}

PersistentStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
PersistentStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState);
PersistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
if (remotePersistedState != null) {
PersistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState);
}
} catch (IOException e) {
throw new OpenSearchException("failed to load metadata", e);
}
Expand All @@ -217,7 +219,7 @@ public void start(
throw new UncheckedIOException(e);
}
}
persistedState.set(new InMemoryPersistedState(currentTerm, clusterState));
PersistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(currentTerm, clusterState));
}
}

Expand Down Expand Up @@ -336,12 +338,12 @@ public void applyClusterState(ClusterChangedEvent event) {

@Override
public void close() throws IOException {
IOUtils.close(PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL));
IOUtils.close(PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL));
}

// visible for testing
public boolean allPendingAsyncStatesWritten() {
final PersistedState ps = PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL);
final PersistedState ps = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
if (ps instanceof AsyncLucenePersistedState) {
return ((AsyncLucenePersistedState) ps).allPendingAsyncStatesWritten();
} else {
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.coordination.PersistentStateRegistry;
import org.opensearch.cluster.coordination.PersistentStateRegistry.PersistedStateType;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.AliasValidator;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -673,6 +673,7 @@ protected Node(
threadPool::relativeTimeInMillis
);
final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService(
nodeEnvironment.nodeId(),
repositoriesServiceReference::get,
settings,
clusterService.getClusterSettings(),
Expand Down Expand Up @@ -1332,7 +1333,7 @@ public Node start() throws NodeValidationException {
}
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
final Metadata onDiskMetadata = PersistentStateRegistry.getPersistedState(PersistedStateType.LOCAL)
final Metadata onDiskMetadata = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.metadata();
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ public void testDataOnlyNodePersistence() throws Exception {
() -> 0L
);
final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService(
nodeEnvironment.nodeId(),
() -> new RepositoriesService(
settings,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@
import static org.mockito.Mockito.when;

/**
* {@link GatewayMetaState} constructor accepts a lot of arguments. It's not always easy / convenient to construct these dependencies. This class constructor
* takes far fewer dependencies and constructs usable {@link GatewayMetaState} with 2 restrictions: no metadata upgrade will be performed and no cluster state
* updaters will be run. This is sufficient for most of the tests.
* {@link GatewayMetaState} constructor accepts a lot of arguments.
* It's not always easy / convenient to construct these dependencies.
* This class constructor takes far fewer dependencies and constructs usable {@link GatewayMetaState} with 2 restrictions:
* no metadata upgrade will be performed and no cluster state updaters will be run. This is sufficient for most of the tests.
*/
public class MockGatewayMetaState extends GatewayMetaState {

private final DiscoveryNode localNode;
private final BigArrays bigArrays;

Expand Down Expand Up @@ -115,6 +115,7 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont
() -> 0L
),
new RemoteClusterStateService(
nodeEnvironment.nodeId(),
() -> new RepositoriesService(settings, clusterService, transportService, Collections.emptyMap(), Collections.emptyMap(),
transportService.getThreadPool()), settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
() -> 0L)
Expand Down

0 comments on commit 1bd3276

Please sign in to comment.