diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 0ceba16ddeecc..7f4c7bec5abeb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -10,16 +10,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.Client; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationState; import org.opensearch.cluster.coordination.PersistedStateRegistry; -import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; @@ -60,20 +57,16 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; -import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS; import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_REMOTE_STATE_ACTION_NAME; import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME; -import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteDownloadStats.CHECKSUM_VALIDATION_FAILED_COUNT; @@ -269,113 +262,6 @@ public void testRemotePublicationDownloadStats() { assertDataNodeDownloadStats(nodesStatsResponseDataNode.getNodes().get(0)); } - public void testRemotePublicationDisabledByRollingRestart() throws Exception { - prepareCluster(3, 2, INDEX_NAME, 1, 2); - ensureStableCluster(5); - ensureGreen(INDEX_NAME); - - Set clusterManagers = internalCluster().getClusterManagerNames(); - Set restartedMasters = new HashSet<>(); - - for (String clusterManager : clusterManagers) { - internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() { - @Override - public Settings onNodeStopped(String nodeName) { - restartedMasters.add(nodeName); - return Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build(); - } - - @Override - public void doAfterNodes(int n, Client client) { - String activeCM = internalCluster().getClusterManagerName(); - Set followingCMs = clusterManagers.stream() - .filter(node -> !Objects.equals(node, activeCM)) - .collect(Collectors.toSet()); - boolean activeCMRestarted = restartedMasters.contains(activeCM); - NodesStatsResponse response = client().admin() - .cluster() - .prepareNodesStats(followingCMs.toArray(new String[0])) - .clear() - .addMetric(DISCOVERY.metricName()) - .get(); - // after master is flipped to restarted master, publication should happen on Transport - response.getNodes().forEach(nodeStats -> { - if (activeCMRestarted) { - PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); - assertTrue( - stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0 - ); - } else { - DiscoveryStats stats = nodeStats.getDiscoveryStats(); - assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount()); - assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount()); - assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount()); - } - }); - - NodesInfoResponse nodesInfoResponse = client().admin() - .cluster() - .prepareNodesInfo(activeCM) - .clear() - .addMetric(SETTINGS.metricName()) - .get(); - // if masterRestarted is true Publication Setting should be false, and vice versa - assertTrue(REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted); - - followingCMs.forEach(node -> { - PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); - CoordinationState.PersistedState remoteState = registry.getPersistedState( - PersistedStateRegistry.PersistedStateType.REMOTE - ); - if (activeCMRestarted) { - assertNull(remoteState.getLastAcceptedState()); - assertNull(remoteState.getLastAcceptedManifest()); - } else { - ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL) - .getLastAcceptedState(); - ClusterState remotePersistedState = remoteState.getLastAcceptedState(); - assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata())); - assertEquals(localState.nodes(), remotePersistedState.nodes()); - assertEquals(localState.routingTable(), remotePersistedState.routingTable()); - assertEquals(localState.customs(), remotePersistedState.customs()); - } - }); - } - }); - - } - ensureGreen(INDEX_NAME); - ensureStableCluster(5); - - String activeCM = internalCluster().getClusterManagerName(); - Set followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet()); - NodesStatsResponse response = client().admin() - .cluster() - .prepareNodesStats(followingCMs.toArray(new String[0])) - .clear() - .addMetric(DISCOVERY.metricName()) - .get(); - response.getNodes().forEach(nodeStats -> { - PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); - assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0); - }); - NodesInfoResponse nodesInfoResponse = client().admin() - .cluster() - .prepareNodesInfo(activeCM) - .clear() - .addMetric(SETTINGS.metricName()) - .get(); - // if masterRestarted is true Publication Setting should be false, and vice versa - assertFalse(REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings())); - - followingCMs.forEach(node -> { - PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); - CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); - assertNull(remoteState.getLastAcceptedState()); - assertNull(remoteState.getLastAcceptedManifest()); - }); - } - public void testMasterReElectionUsesIncrementalUpload() throws IOException { prepareCluster(3, 2, INDEX_NAME, 1, 1); PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);