Skip to content

Commit

Permalink
[Snapshot V2] Support pinned timestamp in delete flow (#15256)
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
anshu1106 authored Sep 4, 2024
1 parent 0753461 commit 5bf34d2
Show file tree
Hide file tree
Showing 8 changed files with 790 additions and 31 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -97,6 +98,8 @@ public final class TransportCleanupRepositoryAction extends TransportClusterMana

private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;

private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory;

@Override
protected String executor() {
return ThreadPool.Names.SAME;
Expand Down Expand Up @@ -124,6 +127,11 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
() -> repositoriesService,
threadPool,
remoteStoreSettings.getSegmentsPathFixedPrefix()
);
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(
() -> repositoriesService,
remoteStoreSettings.getSegmentsPathFixedPrefix()
Expand Down Expand Up @@ -277,6 +285,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
repositoryStateId,
snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
remoteStoreLockManagerFactory,
remoteSegmentStoreDirectoryFactory,
ActionListener.wrap(result -> after(null, result), e -> after(e, null))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -42,6 +43,7 @@
*
* @opensearch.internal
*/
@ExperimentalApi
public class RemoteStorePinnedTimestampService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class);
private static Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
Expand Down
60 changes: 55 additions & 5 deletions server/src/main/java/org/opensearch/repositories/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;

Expand Down Expand Up @@ -207,11 +209,59 @@ void deleteSnapshots(
/**
* Deletes snapshots and releases respective lock files from remote store repository.
*
* @param snapshotIds snapshot ids
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param remoteStoreLockManagerFactory RemoteStoreLockManagerFactory to be used for cleaning up remote store lock files
* @param listener completion listener
* @param snapshotIds snapshot ids
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param remoteStoreLockManagerFactory RemoteStoreLockManagerFactory to be used for cleaning up remote store lock files
* @param remoteSegmentStoreDirectoryFactory RemoteSegmentStoreDirectoryFactory to be used for cleaning up remote store segment files
* @param remoteStorePinnedTimestampService service for pinning and unpinning of the timestamp
* @param snapshotIdsPinnedTimestampMap map of snapshots ids and the pinned timestamp
* @param isShallowSnapshotV2 true for shallow snapshots v2
* @param listener completion listener
*/
default void deleteSnapshotsInternal(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory,
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService,
Map<SnapshotId, Long> snapshotIdsPinnedTimestampMap,
boolean isShallowSnapshotV2,
ActionListener<RepositoryData> listener
) {
throw new UnsupportedOperationException();
}

/**
* Deletes snapshots and unpin the snapshot timestamp using remoteStorePinnedTimestampService
*
* @param snapshotsWithPinnedTimestamp map of snapshot ids and the pinned timestamps
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param remoteSegmentStoreDirectoryFactory RemoteSegmentStoreDirectoryFactory to be used for cleaning up remote store segment files
* @param remoteStorePinnedTimestampService service for pinning and unpinning of the timestamp
* @param listener completion listener
*/
default void deleteSnapshotsWithPinnedTimestamp(
Map<SnapshotId, Long> snapshotsWithPinnedTimestamp,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory,
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService,
ActionListener<RepositoryData> listener
) {
throw new UnsupportedOperationException();
}

/**
* Deletes snapshots and releases respective lock files from remote store repository
*
* @param snapshotIds
* @param repositoryStateId
* @param repositoryMetaVersion
* @param remoteStoreLockManagerFactory
* @param listener
*/
default void deleteSnapshotsAndReleaseLockFiles(
Collection<SnapshotId> snapshotIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public final class RepositoryData {
* The indices found in the repository across all snapshots, as a name to {@link IndexId} mapping
*/
private final Map<String, IndexId> indices;

public Map<IndexId, List<SnapshotId>> getIndexSnapshots() {
return indexSnapshots;
}

/**
* The snapshots that each index belongs to.
*/
Expand Down
Loading

0 comments on commit 5bf34d2

Please sign in to comment.