Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Cluster State] Move remote cluster state related settings to node attrs #9811

Merged
merged 11 commits into from
Sep 7, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -91,6 +92,16 @@ public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailu
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
TRANSLOG_REPOSITORY_NAME
);
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
REPOSITORY_NAME
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
REPOSITORY_NAME
);

return Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
Expand All @@ -104,6 +115,9 @@ public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailu
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, TRANSLOG_REPOSITORY_NAME)
.put(translogRepoTypeAttributeKey, "mock")
.put(translogRepoSettingsAttributeKeyPrefix + "location", repoLocation)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
.put(stateRepoTypeAttributeKey, "mock")
.put(stateRepoSettingsAttributeKeyPrefix + "location", repoLocation)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.stream.Collectors;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -222,14 +223,27 @@ public static Settings buildRemoteStoreNodeAttributes(
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);
String stateRepoTypeAttributeKey = String.format(
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
segmentRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
segmentRepoName
);

Settings.Builder settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(segmentRepoTypeAttributeKey, FsRepository.TYPE)
.put(segmentRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
.put(translogRepoTypeAttributeKey, FsRepository.TYPE)
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath);
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);

linuxpi marked this conversation as resolved.
Show resolved Hide resolved
if (withRateLimiterAttributes) {
settings.put(segmentRepoSettingsAttributeKeyPrefix + "compress", randomBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.ExecutionException;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.indices.ShardLimitValidator.SETTING_MAX_SHARDS_PER_CLUSTER_KEY;

Expand All @@ -32,11 +31,7 @@ public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), REPOSITORY_NAME)
.build();
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

private void addNewNodes(int dataNodeCount, int clusterManagerNodeCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* The core class of the cluster state coordination algorithm, directly implementing the
Expand Down Expand Up @@ -101,7 +101,7 @@ public CoordinationState(
.getLastAcceptedState()
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings);
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,6 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
Expand Down Expand Up @@ -165,7 +165,7 @@ public void start(

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
if (REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreClusterStateEnabled(settings)) {
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -52,6 +54,7 @@
import java.util.stream.Collectors;

import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand Down Expand Up @@ -80,23 +83,13 @@ public class RemoteClusterStateService implements Closeable {
/**
* Used to specify if cluster state metadata should be published to remote store
*/
// TODO The remote state enabled and repository settings should be read from node attributes.
// Dependent on https://github.com/opensearch-project/OpenSearch/pull/9105/
public static final Setting<Boolean> REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting(
"cluster.remote_store.state.enabled",
false,
Property.NodeScope,
Property.Final
);
/**
* Used to specify default repo to use for cluster state metadata upload
*/
public static final Setting<String> REMOTE_CLUSTER_STATE_REPOSITORY_SETTING = Setting.simpleString(
"cluster.remote_store.state.repository",
"",
Property.NodeScope,
Property.Final
);

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

public static final String DELIMITER = "__";
Expand All @@ -115,7 +108,7 @@ public RemoteClusterStateService(
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier
) {
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
Expand Down Expand Up @@ -384,8 +377,10 @@ void ensureRepositorySet() {
if (blobStoreRepository != null) {
return;
}
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -681,7 +682,7 @@ protected Node(
threadPool::relativeTimeInMillis
);
final RemoteClusterStateService remoteClusterStateService;
if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreClusterStateEnabled(settings)) {
remoteClusterStateService = new RemoteClusterStateService(
nodeEnvironment.nodeId(),
repositoriesServiceReference::get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

Expand All @@ -36,6 +37,7 @@ public class RemoteStoreNodeAttribute {
public static final String REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = "remote_store";
public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository";
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository";
public static final String REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.state.repository";
public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type";
public static final String REMOTE_STORE_REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s."
+ CryptoMetadata.CRYPTO_METADATA_KEY;
Expand Down Expand Up @@ -134,6 +136,7 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {

repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
Expand All @@ -146,6 +149,10 @@ public static boolean isRemoteStoreAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty() == false;
}

public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteStoreAttributePresent(settings);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
}

public RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -62,6 +64,9 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -937,9 +942,26 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1);
persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, new RemotePersistedState(remoteClusterStateService));
final Settings settings = Settings.builder()

String randomRepoName = "randomRepoName";
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
randomRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
randomRepoName
);

Settings settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName)
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();

final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings);
coordinationState.handlePrePublish(clusterState);
Mockito.verifyNoInteractions(remoteClusterStateService);
Expand Down
Loading
Loading