Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Make Remote Publication a dynamic setting (#15937) #16362

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -81,7 +80,6 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
private final boolean isRemoteStateEnabled;
private boolean isRemotePublicationEnabled;

public CoordinationState(
DiscoveryNode localNode,
Expand All @@ -105,14 +103,6 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
}

public boolean isRemotePublicationEnabled() {
return isRemotePublicationEnabled;
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteClusterStateService remoteClusterStateService;
private final RemoteStoreNodeService remoteStoreNodeService;
private NodeConnectionsService nodeConnectionsService;
private final RemoteClusterStateService remoteClusterStateService;
private final ClusterSettings clusterSettings;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -316,6 +317,7 @@ public Coordinator(
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
this.clusterSettings = clusterSettings;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -1365,7 +1367,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled(),
this.isRemotePublicationEnabled(),
persistedStateRegistry
);
logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString());
Expand Down Expand Up @@ -1900,8 +1902,8 @@ public static boolean isZen1Node(DiscoveryNode discoveryNode) {
}

public boolean isRemotePublicationEnabled() {
if (coordinationState.get() != null) {
return coordinationState.get().isRemotePublicationEnabled();
if (remoteClusterStateService != null) {
return remoteClusterStateService.isRemotePublicationEnabled();
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;

/**
* A Service which provides APIs to upload and download routing table from remote store.
Expand All @@ -76,7 +76,7 @@ public InternalRemoteRoutingTableService(
ThreadPool threadpool,
String clusterName
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
assert isRemoteRoutingTableConfigured(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadpool;
Expand Down Expand Up @@ -233,7 +233,7 @@ protected void doClose() throws IOException {

@Override
protected void doStart() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
assert isRemoteRoutingTableConfigured(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;

/**
* Factory to provide impl for RemoteRoutingTableService based on settings.
Expand All @@ -37,7 +37,7 @@ public static RemoteRoutingTableService getService(
ThreadPool threadPool,
String clusterName
) {
if (isRemoteRoutingTableEnabled(settings)) {
if (isRemoteRoutingTableConfigured(settings)) {
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName);
}
return new NoopRemoteRoutingTableService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -112,6 +113,8 @@
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand All @@ -132,7 +135,7 @@
REMOTE_PUBLICATION_SETTING_KEY,
false,
Property.NodeScope,
Property.Final
Property.Dynamic
);

/**
Expand Down Expand Up @@ -232,7 +235,7 @@
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;
private volatile AtomicBoolean isPublicationEnabled;
private final String remotePathPrefix;

private final RemoteClusterStateCache remoteClusterStateCache;
Expand Down Expand Up @@ -273,9 +276,12 @@
this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.isPublicationEnabled = new AtomicBoolean(
clusterSettings.get(REMOTE_PUBLICATION_SETTING)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured(settings)
);
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
Expand Down Expand Up @@ -303,19 +309,20 @@
return null;
}

boolean publicationEnabled = isPublicationEnabled.get();
UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values()),
emptyMap(),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), publicationEnabled),
true,
true,
true,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
isPublicationEnabled,
publicationEnabled,
publicationEnabled,
publicationEnabled,
publicationEnabled ? clusterState.customs() : Collections.emptyMap(),
publicationEnabled,
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()),
null
);
Expand Down Expand Up @@ -394,9 +401,9 @@
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();

final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled.get());
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled.get(), false);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
previousManifest.getClusterStateCustomMap()
Expand Down Expand Up @@ -461,10 +468,10 @@
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

final boolean updateDiscoveryNodes = isPublicationEnabled
final boolean updateDiscoveryNodes = isPublicationEnabled.get()
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
final boolean updateClusterBlocks = isPublicationEnabled.get() && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled.get()
&& Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
Expand Down Expand Up @@ -1115,6 +1122,14 @@
this.remoteClusterStateValidationMode = remoteClusterStateValidationMode;
}

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isPublicationEnabled.set(false);

Check warning on line 1127 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1127

Added line #L1127 was not covered by tests
} else {
this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableConfigured(settings));
}
}

// Package private for unit test
RemoteRoutingTableService getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
Expand Down Expand Up @@ -1830,7 +1845,7 @@
}

public boolean isRemotePublicationEnabled() {
return this.isPublicationEnabled;
return this.isPublicationEnabled.get();
}

public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class RemotePersistenceStats {
RemoteDownloadStats remoteDiffDownloadStats;
RemoteDownloadStats remoteFullDownloadStats;

final String FULL_DOWNLOAD_STATS = "remote_full_download";
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";
public static final String FULL_DOWNLOAD_STATS = "remote_full_download";
public static final String DIFF_DOWNLOAD_STATS = "remote_diff_download";

public RemotePersistenceStats() {
remoteUploadStats = new RemoteUploadStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
.isEmpty() == false;
}

public static boolean isRemoteRoutingTableEnabled(Settings settings) {
public static boolean isRemoteRoutingTableConfigured(Settings settings) {
return isRemoteRoutingTableAttributePresent(settings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
Expand Down Expand Up @@ -1268,16 +1267,6 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRe
verifyNoInteractions(remoteClusterStateService);
}

public void testIsRemotePublicationEnabled_WithInconsistentSettings() {
// create settings with remote state disabled but publication enabled
Settings settings = Settings.builder()
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false)
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
.build();
CoordinationState coordinationState = createCoordinationState(psr1, node1, settings);
assertFalse(coordinationState.isRemotePublicationEnabled());
}

public static CoordinationState createCoordinationState(
PersistedStateRegistry persistedStateRegistry,
DiscoveryNode localNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -370,6 +370,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
// TODO Make the publication flag parameterized
publicationEnabled = true;
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, publicationEnabled).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
Expand All @@ -388,6 +390,7 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
),
writableRegistry()
);
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager())
.customs(
Map.of(
Expand Down Expand Up @@ -747,6 +750,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException {
publicationEnabled = true;
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, true).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
Expand All @@ -765,6 +770,7 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I
),
writableRegistry()
);
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
Expand Down Expand Up @@ -2749,7 +2755,7 @@ public void testRemoteStateUploadStats() throws IOException {
}

public void testRemoteRoutingTableNotInitializedWhenDisabled() {
if (isRemoteRoutingTableEnabled(settings)) {
if (isRemoteRoutingTableConfigured(settings)) {
assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService);
} else {
assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof NoopRemoteRoutingTableService);
Expand Down
Loading