diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 08cd7d0ab02db..45cf83a1bad74 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -66,6 +66,7 @@ public class CoordinationState { // persisted state private final PersistedState persistedState; + private final PersistedState remotePersistedState; // transient state private VoteCollection joinVotes; @@ -75,7 +76,7 @@ public class CoordinationState { private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; - public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy) { + public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy, PersistedState remotePersistedState) { this.localNode = localNode; // persisted state @@ -89,6 +90,7 @@ public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, this.lastPublishedVersion = 0L; this.lastPublishedConfiguration = persistedState.getLastAcceptedState().getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); + this.remotePersistedState = remotePersistedState; } public long getCurrentTerm() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 546fe2d4643d3..b93277ffabad5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -46,6 +46,7 @@ import org.opensearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.coordination.CoordinationState.PersistedState; import org.opensearch.cluster.coordination.CoordinationState.VoteCollection; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; @@ -57,6 +58,7 @@ import org.opensearch.cluster.service.ClusterApplier; import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.opensearch.cluster.service.ClusterManagerService; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -181,6 +183,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; + private final RemoteClusterStateService remoteClusterStateService; + private final Supplier remotePersistedStateSupplier; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -201,7 +205,9 @@ public Coordinator( Random random, RerouteService rerouteService, ElectionStrategy electionStrategy, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + RemoteClusterStateService remoteClusterStateService, + Supplier remotePersistedStateSupplier ) { this.settings = settings; this.transportService = transportService; @@ -286,6 +292,8 @@ public Coordinator( joinHelper::logLastFailedJoinAttempt ); this.nodeHealthService = nodeHealthService; + this.remoteClusterStateService = remoteClusterStateService; + this.remotePersistedStateSupplier = remotePersistedStateSupplier; this.localNodeCommissioned = true; } @@ -821,7 +829,7 @@ boolean publicationInProgress() { protected void doStart() { synchronized (mutex) { CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); - coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy)); + coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get())); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); @@ -1308,6 +1316,13 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) leaderChecker.setCurrentNodes(publishNodes); followersChecker.setCurrentNodes(publishNodes); lagDetector.setTrackedNodes(publishNodes); + + PersistedState remotePersistedState = remotePersistedStateSupplier.get(); + if (remotePersistedState == null) { + logger.error("remote persisted state is null"); + } else { + remotePersistedState.setLastAcceptedState(clusterState); + } publication.start(followersChecker.getFaultyNodes()); } } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java new file mode 100644 index 0000000000000..d5d446d4d8be9 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java @@ -0,0 +1,178 @@ +package org.opensearch.cluster.store; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +public class ClusterMetadataMarker implements Writeable, ToXContentFragment { + + private static final ParseField INDICES_FIELD = new ParseField("indices"); + private static final ParseField TERM_FIELD = new ParseField("term"); + private static final ParseField VERSION_FIELD = new ParseField("version"); + private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); + private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid"); + + private static Map indices(Object[] fields) { + return new HashMap<>((Map) fields[0]); + } + + private static long term(Object[] fields) { + return (long) fields[1]; + } + + private static long version(Object[] fields) { + return (long) fields[2]; + } + + private static String clusterUUID(Object[] fields) { + return (String) fields[3]; + } + + private static String stateUUID(Object[] fields) { + return (String) fields[4]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "cluster_metadata_marker", + fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields)) + ); + + private final Map indices; + private final long term; + private final long version; + private final String clusterUUID; + private final String stateUUID; + + public Map getIndices() { + return indices; + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + public String getClusterUUID() { + return clusterUUID; + } + + public String getStateUUID() { + return stateUUID; + } + + public ClusterMetadataMarker(Map indices, long term, long version, String clusterUUID, String stateUUID) { + this.indices = Collections.unmodifiableMap(indices); + this.term = term; + this.version = version; + this.clusterUUID = clusterUUID; + this.stateUUID = stateUUID; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.field(INDICES_FIELD.getPreferredName(), getIndices()) + .field(TERM_FIELD.getPreferredName(), getTerm()) + .field(VERSION_FIELD.getPreferredName(), getVersion()) + .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()) + .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMapWithConsistentOrder(indices); + out.writeVLong(term); + out.writeVLong(version); + out.writeString(clusterUUID); + out.writeString(stateUUID); + } + + public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public static class Builder { + + private final Map indices; + private long term; + private long version; + private String clusterUUID; + private String stateUUID; + + + public void term(long term) { + this.term = term; + } + + public void version(long version) { + this.version = version; + } + + public void clusterUUID(String clusterUUID) { + this.clusterUUID = clusterUUID; + } + + public void stateUUID(String stateUUID) { + this.stateUUID = stateUUID; + } + + public Map getIndices() { + return indices; + } + + public Builder() { + indices = new HashMap<>(); + } + + public ClusterMetadataMarker build() { + return new ClusterMetadataMarker(indices, term, version, clusterUUID, stateUUID); + } + + } + + public static class UploadedIndexMetadata implements Writeable, ToXContentFragment { + + private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + + private static String uploadedFilename(Object[] fields) { + return (String) fields[0]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("uploaded_index_metadata", + fields -> new UploadedIndexMetadata(uploadedFilename(fields))); + + private final String uploadedFilename; + + public UploadedIndexMetadata(String uploadedFileName) { + this.uploadedFilename = uploadedFileName; + } + + public String getUploadedFilename() { + return uploadedFilename; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(uploadedFilename); + } + + public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java new file mode 100644 index 0000000000000..6d861b02dd36b --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java @@ -0,0 +1,113 @@ +package org.opensearch.cluster.store; + +import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.indices.IndicesService; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +public class RemoteClusterStateService { + + private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + + private static final String DELIMITER = "__"; + + private final Supplier repositoriesService; + private final ClusterSettings clusterSettings; + private BlobStoreRepository blobStoreRepository; + + public RemoteClusterStateService(Supplier repositoriesService, ClusterSettings clusterSettings) { + this.repositoriesService = repositoriesService; + this.clusterSettings = clusterSettings; + } + + public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException { + if (!clusterState.nodes().isLocalNodeElectedClusterManager()) { + logger.error("local node is not electer cluster manager. Exiting"); + return; + } + setRepository(); + if (blobStoreRepository == null) { + logger.error("Unable to set repository"); + return; + } + + Map indexMetadataKeys = new HashMap<>(); + for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { + //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + String indexMetadataKey = blobStoreRepository.writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(), + indexMetadata, indexMetadataFileName(indexMetadata)); + indexMetadataKeys.put(indexMetadata.getIndex().getName(), indexMetadataKey); + } + uploadMarker(clusterState, indexMetadataKeys); + } + + private void setRepository() { + try { + if (blobStoreRepository != null) { + return; + } + if (clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING)) { + String remoteStoreRepo = clusterSettings.get(CLUSTER_REMOTE_STORE_REPOSITORY_SETTING); + Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } else { + logger.info("remote store is not enabled"); + } + } catch (Exception e) { + logger.error("set repo exception", e); + } + } + + public void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, ClusterState clusterState) { + //todo + } + + // why do we need this ? + public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) { + //todo + } + + public ClusterState getLatestClusterState(String clusterUUID) { + //todo + return null; + } + + //todo exception handling + public void uploadMarker(ClusterState clusterState, Map indexMetadataKeys) throws IOException { + synchronized (this) { + String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version()); + Map uploadedIndices = indexMetadataKeys.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new ClusterMetadataMarker.UploadedIndexMetadata(e.getValue()))); + ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndices, clusterState.term(), clusterState.getVersion(), + clusterState.metadata().clusterUUID(), + clusterState.stateUUID()); + blobStoreRepository.writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName); + } + } + + private static String getMarkerFileName(long term, long version) { + //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker + return String.join(DELIMITER, String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version), + String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()), "marker"); + } + + + private static String indexMetadataFileName(IndexMetadata indexMetadata) { + return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); + } + + +} diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 44f44fa055b2b..48252f6d06e1e 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterApplier; import org.opensearch.cluster.service.ClusterManagerService; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.Randomness; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.network.NetworkService; @@ -129,7 +130,8 @@ public DiscoveryModule( Path configFile, GatewayMetaState gatewayMetaState, RerouteService rerouteService, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + RemoteClusterStateService remoteClusterStateService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -205,7 +207,9 @@ public DiscoveryModule( new Random(Randomness.get().nextLong()), rerouteService, electionStrategy, - nodeHealthService + nodeHealthService, + remoteClusterStateService, + gatewayMetaState::getRemotePersistedState ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index ad9faef067c89..0bec6d7429b62 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -52,6 +52,7 @@ import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.SetOnce; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; @@ -60,6 +61,7 @@ import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.io.IOUtils; import org.opensearch.env.NodeMetadata; +import org.opensearch.indices.IndicesService; import org.opensearch.node.Node; import org.opensearch.plugins.MetadataUpgrader; import org.opensearch.threadpool.ThreadPool; @@ -92,6 +94,7 @@ * @opensearch.internal */ public class GatewayMetaState implements Closeable { + private static final Logger logger = LogManager.getLogger(GatewayMetaState.class); /** * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially @@ -103,12 +106,19 @@ public class GatewayMetaState implements Closeable { // Set by calling start() private final SetOnce persistedState = new SetOnce<>(); + private final SetOnce remotePersistedState = new SetOnce<>(); + public PersistedState getPersistedState() { final PersistedState persistedState = this.persistedState.get(); assert persistedState != null : "not started"; return persistedState; } + public PersistedState getRemotePersistedState() { + final PersistedState remotePersistedState = this.remotePersistedState.get(); + return remotePersistedState; + } + public Metadata getMetadata() { return getPersistedState().getLastAcceptedState().metadata(); } @@ -120,7 +130,8 @@ public void start( MetaStateService metaStateService, MetadataIndexUpgradeService metadataIndexUpgradeService, MetadataUpgrader metadataUpgrader, - PersistedClusterStateService persistedClusterStateService + PersistedClusterStateService persistedClusterStateService, + RemoteClusterStateService remoteClusterStateService ) { assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); @@ -144,8 +155,14 @@ public void start( } PersistedState persistedState = null; + PersistedState remotePersistedState = null; boolean success = false; try { + if (IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.get(settings)) { + logger.info("remote store is enabled while bootstrap"); + } else { + logger.info("remote store is NOT enabled while bootstrap"); + } final ClusterState clusterState = prepareInitialClusterState( transportService, clusterService, @@ -154,9 +171,15 @@ public void start( .metadata(upgradeMetadataForNode(metadata, metadataIndexUpgradeService, metadataUpgrader)) .build() ); + if (IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.get(clusterState.metadata().settings())) { + logger.info("After clusterState build: remote store is enabled while bootstrap"); + } else { + logger.info("After clusterState build: remote store is NOT enabled while bootstrap"); + } if (DiscoveryNode.isClusterManagerNode(settings)) { persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); + remotePersistedState = new RemotePersistedState(remoteClusterStateService, currentTerm, clusterState); } else { persistedState = new AsyncLucenePersistedState( settings, @@ -182,6 +205,7 @@ public void start( } this.persistedState.set(persistedState); + this.remotePersistedState.set(remotePersistedState); } catch (IOException e) { throw new OpenSearchException("failed to load metadata", e); } @@ -599,4 +623,82 @@ public void close() throws IOException { IOUtils.close(persistenceWriter.getAndSet(null)); } } + + public static class RemotePersistedState implements PersistedState { + + //todo check diff between currentTerm and clusterState term + private long currentTerm; + private ClusterState lastAcceptedState; + private final RemoteClusterStateService remoteClusterStateService; + private boolean writeNextStateFully; + + public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService, final long currentTerm, final ClusterState lastAcceptedState) { + this.remoteClusterStateService = remoteClusterStateService; + this.currentTerm = currentTerm; + this.lastAcceptedState = lastAcceptedState; + + // todo write state to remote only for active master + } + + @Override + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public ClusterState getLastAcceptedState() { + return lastAcceptedState; + } + + @Override + public void setCurrentTerm(long currentTerm) { + // todo is synchronization needed ? + try { + if (writeNextStateFully) { + remoteClusterStateService.writeFullStateAndCommit(currentTerm, lastAcceptedState); + writeNextStateFully = false; + } else { + remoteClusterStateService.writeIncrementalTermUpdateAndCommit(currentTerm, lastAcceptedState.version()); + } + } catch (Exception e) { + handleExceptionOnWrite(e); + } + this.currentTerm = currentTerm; + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + try { +// if (writeNextStateFully) { + remoteClusterStateService.writeFullStateAndCommit(currentTerm, clusterState); +// writeNextStateFully = false; +// } else { +// if (clusterState.term() != lastAcceptedState.term()) { +// assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); +// remoteClusterStateService.writeFullStateAndCommit(currentTerm, clusterState); +// } else { +// remoteClusterStateService.writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState); +// } +// } + } catch (Exception e) { + handleExceptionOnWrite(e); + } + } + + @Override + public void markLastAcceptedStateAsCommitted() { + //todo is custom implementation needed? + PersistedState.super.markLastAcceptedStateAsCommitted(); + } + + @Override + public void close() throws IOException { + PersistedState.super.close(); + } + + private void handleExceptionOnWrite(Exception e) { + writeNextStateFully = true; + throw ExceptionsHelper.convertToRuntime(e); + } + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d3655671b516d..40a57d498fc92 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.opensearch.ExceptionsHelper; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.SetOnce; import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.ByteSizeUnit; @@ -668,6 +669,8 @@ protected Node( threadPool::relativeTimeInMillis ); + final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService(repositoriesServiceReference::get, clusterService.getClusterSettings()); + // collect engine factory providers from plugins final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); final Collection>> engineFactoryProviders = enginePlugins.stream() @@ -969,7 +972,8 @@ protected Node( environment.configDir(), gatewayMetaState, rerouteService, - fsHealthService + fsHealthService, + remoteClusterStateService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, @@ -1091,6 +1095,7 @@ protected Node( b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory); + b.bind(RemoteClusterStateService.class).toInstance(remoteClusterStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); @@ -1292,7 +1297,8 @@ public Node start() throws NodeValidationException { injector.getInstance(MetaStateService.class), injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class), - injector.getInstance(PersistedClusterStateService.class) + injector.getInstance(PersistedClusterStateService.class), + injector.getInstance(RemoteClusterStateService.class) ); if (Assertions.ENABLED) { try { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index f04bf83c2f1d1..86801e0bb2f2d 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -64,6 +64,7 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.store.ClusterMetadataMarker; import org.opensearch.common.Nullable; import org.opensearch.common.Numbers; import org.opensearch.common.SetOnce; @@ -196,6 +197,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat"; + public static final String METADATA_MARKER_NAME_FORMAT = "%s"; + public static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat"; public static final String SHALLOW_SNAPSHOT_NAME_FORMAT = SHALLOW_SNAPSHOT_PREFIX + "%s.dat"; @@ -300,12 +303,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Metadata::fromXContent ); + //TODO name format to be changed public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", METADATA_NAME_FORMAT, IndexMetadata::fromXContent ); + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MARKER_FORMAT = new ChecksumBlobStoreFormat<>( + "cluster-metadata-marker", + METADATA_MARKER_NAME_FORMAT, + ClusterMetadataMarker::fromXContent + ); + private static final String SNAPSHOT_CODEC = "snapshot"; public static final ChecksumBlobStoreFormat SNAPSHOT_FORMAT = new ChecksumBlobStoreFormat<>( @@ -1522,6 +1532,16 @@ public BlobContainer shardContainer(IndexId indexId, int shardId) { return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId))); } + public BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { + //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX + return blobStore().blobContainer(basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("index").add(indexUUID)); + } + + public BlobContainer markerContainer(String clusterName, String clusterUUID) { + //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker + return blobStore().blobContainer(basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("marker")); + } + /** * Configures RateLimiter based on repository and global settings * @@ -2868,6 +2888,18 @@ public void verify(String seed, DiscoveryNode localNode) { } } + public String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata indexMetadata, String fileName) throws IOException { + BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, indexMetadata.getIndexUUID()); + INDEX_METADATA_FORMAT.write(indexMetadata, indexMetadataContainer, fileName, compressor); + // returning full path + return indexMetadataContainer.path().buildAsString() + fileName; + } + + public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker marker, String fileName) throws IOException { + BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID); + CLUSTER_METADATA_MARKER_FORMAT.write(marker, metadataMarkerContainer, fileName, compressor); + } + @Override public String toString() { return "BlobStoreRepository[" + "[" + metadata.name() + "], [" + blobStore.get() + ']' + ']';