Skip to content

Commit

Permalink
Upload all index metadata to remote
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Aug 8, 2023
1 parent a3b515a commit 327dc4c
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class CoordinationState {

// persisted state
private final PersistedState persistedState;
private final PersistedState remotePersistedState;

// transient state
private VoteCollection joinVotes;
Expand All @@ -75,7 +76,7 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;

public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy) {
public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy, PersistedState remotePersistedState) {
this.localNode = localNode;

// persisted state
Expand All @@ -89,6 +90,7 @@ public CoordinationState(DiscoveryNode localNode, PersistedState persistedState,
this.lastPublishedVersion = 0L;
this.lastPublishedConfiguration = persistedState.getLastAcceptedState().getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.remotePersistedState = remotePersistedState;
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.opensearch.cluster.coordination.CoordinationState.PersistedState;
import org.opensearch.cluster.coordination.CoordinationState.VoteCollection;
import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
Expand All @@ -57,6 +58,7 @@
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.store.RemoteClusterStateService;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -181,6 +183,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final RemoteClusterStateService remoteClusterStateService;
private final Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -201,7 +205,9 @@ public Coordinator(
Random random,
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
RemoteClusterStateService remoteClusterStateService,
Supplier<CoordinationState.PersistedState> remotePersistedStateSupplier
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -286,6 +292,8 @@ public Coordinator(
joinHelper::logLastFailedJoinAttempt
);
this.nodeHealthService = nodeHealthService;
this.remoteClusterStateService = remoteClusterStateService;
this.remotePersistedStateSupplier = remotePersistedStateSupplier;
this.localNodeCommissioned = true;
}

Expand Down Expand Up @@ -821,7 +829,7 @@ boolean publicationInProgress() {
protected void doStart() {
synchronized (mutex) {
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));
coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get()));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down Expand Up @@ -1308,6 +1316,13 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);
lagDetector.setTrackedNodes(publishNodes);

PersistedState remotePersistedState = remotePersistedStateSupplier.get();
if (remotePersistedState == null) {
logger.error("remote persisted state is null");
} else {
remotePersistedState.setLastAcceptedState(clusterState);
}
publication.start(followersChecker.getFaultyNodes());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package org.opensearch.cluster.store;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.ParseField;
import org.opensearch.core.xcontent.ConstructingObjectParser;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

public class ClusterMetadataMarker implements Writeable, ToXContentFragment {

private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField TERM_FIELD = new ParseField("term");
private static final ParseField VERSION_FIELD = new ParseField("version");
private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid");

private static Map<String, UploadedIndexMetadata> indices(Object[] fields) {
return new HashMap<>((Map<String, UploadedIndexMetadata>) fields[0]);
}

private static long term(Object[] fields) {
return (long) fields[1];
}

private static long version(Object[] fields) {
return (long) fields[2];
}

private static String clusterUUID(Object[] fields) {
return (String) fields[3];
}

private static String stateUUID(Object[] fields) {
return (String) fields[4];
}

private static final ConstructingObjectParser<ClusterMetadataMarker, Void> PARSER = new ConstructingObjectParser<>(
"cluster_metadata_marker",
fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields))
);

private final Map<String, UploadedIndexMetadata> indices;
private final long term;
private final long version;
private final String clusterUUID;
private final String stateUUID;

public Map<String, UploadedIndexMetadata> getIndices() {
return indices;
}

public long getTerm() {
return term;
}

public long getVersion() {
return version;
}

public String getClusterUUID() {
return clusterUUID;
}

public String getStateUUID() {
return stateUUID;
}

public ClusterMetadataMarker(Map<String, UploadedIndexMetadata> indices, long term, long version, String clusterUUID, String stateUUID) {
this.indices = Collections.unmodifiableMap(indices);
this.term = term;
this.version = version;
this.clusterUUID = clusterUUID;
this.stateUUID = stateUUID;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(INDICES_FIELD.getPreferredName(), getIndices())
.field(TERM_FIELD.getPreferredName(), getTerm())
.field(VERSION_FIELD.getPreferredName(), getVersion())
.field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID())
.field(STATE_UUID_FIELD.getPreferredName(), getStateUUID());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMapWithConsistentOrder(indices);
out.writeVLong(term);
out.writeVLong(version);
out.writeString(clusterUUID);
out.writeString(stateUUID);
}

public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

public static class Builder {

private final Map<String, UploadedIndexMetadata> indices;
private long term;
private long version;
private String clusterUUID;
private String stateUUID;


public void term(long term) {
this.term = term;
}

public void version(long version) {
this.version = version;
}

public void clusterUUID(String clusterUUID) {
this.clusterUUID = clusterUUID;
}

public void stateUUID(String stateUUID) {
this.stateUUID = stateUUID;
}

public Map<String, UploadedIndexMetadata> getIndices() {
return indices;
}

public Builder() {
indices = new HashMap<>();
}

public ClusterMetadataMarker build() {
return new ClusterMetadataMarker(indices, term, version, clusterUUID, stateUUID);
}

}

public static class UploadedIndexMetadata implements Writeable, ToXContentFragment {

private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename");

private static String uploadedFilename(Object[] fields) {
return (String) fields[0];
}

private static final ConstructingObjectParser<UploadedIndexMetadata, Void> PARSER = new ConstructingObjectParser<>("uploaded_index_metadata",
fields -> new UploadedIndexMetadata(uploadedFilename(fields)));

private final String uploadedFilename;

public UploadedIndexMetadata(String uploadedFileName) {
this.uploadedFilename = uploadedFileName;
}

public String getUploadedFilename() {
return uploadedFilename;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(uploadedFilename);
}

public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package org.opensearch.cluster.store;

import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

public class RemoteClusterStateService {

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

private static final String DELIMITER = "__";

private final Supplier<RepositoriesService> repositoriesService;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;

public RemoteClusterStateService(Supplier<RepositoriesService> repositoriesService, ClusterSettings clusterSettings) {
this.repositoriesService = repositoriesService;
this.clusterSettings = clusterSettings;
}

public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException {
if (!clusterState.nodes().isLocalNodeElectedClusterManager()) {
logger.error("local node is not electer cluster manager. Exiting");
return;
}
setRepository();
if (blobStoreRepository == null) {
logger.error("Unable to set repository");
return;
}

Map<String, String> indexMetadataKeys = new HashMap<>();
for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
String indexMetadataKey = blobStoreRepository.writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(),
indexMetadata, indexMetadataFileName(indexMetadata));
indexMetadataKeys.put(indexMetadata.getIndex().getName(), indexMetadataKey);
}
uploadMarker(clusterState, indexMetadataKeys);
}

private void setRepository() {
try {
if (blobStoreRepository != null) {
return;
}
if (clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING)) {
String remoteStoreRepo = clusterSettings.get(CLUSTER_REMOTE_STORE_REPOSITORY_SETTING);
Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
} else {
logger.info("remote store is not enabled");
}
} catch (Exception e) {
logger.error("set repo exception", e);
}
}

public void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, ClusterState clusterState) {
//todo
}

// why do we need this ?
public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) {
//todo
}

public ClusterState getLatestClusterState(String clusterUUID) {
//todo
return null;
}

//todo exception handling
public void uploadMarker(ClusterState clusterState, Map<String, String> indexMetadataKeys) throws IOException {
synchronized (this) {
String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
Map<String, ClusterMetadataMarker.UploadedIndexMetadata> uploadedIndices = indexMetadataKeys.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ClusterMetadataMarker.UploadedIndexMetadata(e.getValue())));
ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndices, clusterState.term(), clusterState.getVersion(),
clusterState.metadata().clusterUUID(),
clusterState.stateUUID());
blobStoreRepository.writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
}
}

private static String getMarkerFileName(long term, long version) {
//123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
return String.join(DELIMITER, String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()), "marker");
}


private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}


}
Loading

0 comments on commit 327dc4c

Please sign in to comment.