From e64941a12ba77d739049005aabec5b34139e61a4 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 22 May 2024 23:43:53 +0530 Subject: [PATCH] Remote state interfaces Signed-off-by: Sooraj Sinha --- .../remote/AbstractRemoteBlobStoreObject.java | 126 ++++++++++++++++++ .../gateway/remote/BlobPathParameters.java | 35 +++++ .../gateway/remote/RemoteObject.java | 36 +++++ 3 files changed, 197 insertions(+) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java diff --git a/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java b/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java new file mode 100644 index 0000000000000..ebb7f98b7bb43 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.compress.Compressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Base64; +import java.util.concurrent.ExecutorService; + +/** + * An extension of {@link RemoteObject} class which caters to the use case of writing to and reading from a blob storage + * + * @param The class type which can be uploaded to or downloaded from a blob storage. + */ +public abstract class AbstractRemoteBlobStoreObject implements RemoteObject { + + public static final String PATH_DELIMITER = "/"; + + private final BlobStoreTransferService transferService; + private final BlobStoreRepository blobStoreRepository; + private final String clusterName; + private final ExecutorService executorService; + + public AbstractRemoteBlobStoreObject( + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + String clusterName, + ThreadPool threadPool + ) { + this.transferService = blobStoreTransferService; + this.blobStoreRepository = blobStoreRepository; + this.clusterName = clusterName; + this.executorService = threadPool.executor(ThreadPool.Names.GENERIC); + } + + public abstract BlobPathParameters getBlobPathParameters(); + + public abstract String getFullBlobName(); + + public String getBlobFileName() { + if (getFullBlobName() == null) { + generateBlobFileName(); + } + String[] pathTokens = getFullBlobName().split(PATH_DELIMITER); + return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1]; + } + + public abstract String generateBlobFileName(); + + public abstract UploadedMetadata getUploadedMetadata(); + + @Override + public CheckedRunnable writeAsync(ActionListener listener) { + return () -> { + assert get() != null; + // TODO add implementation + }; + } + + @Override + public T read() throws IOException { + assert getFullBlobName() != null; + return deserialize(transferService.downloadBlob(getBlobPathForDownload(), getBlobFileName())); + } + + @Override + public void readAsync(ActionListener listener) { + executorService.execute(() -> { + try { + listener.onResponse(read()); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + + public BlobPath getBlobPathForUpload() { + BlobPath blobPath = blobStoreRepository.basePath().add(encodeString(clusterName)).add("cluster-state").add(clusterUUID()); + for (String token : getBlobPathParameters().getPathTokens()) { + blobPath = blobPath.add(token); + } + return blobPath; + } + + public BlobPath getBlobPathForDownload() { + String[] pathTokens = extractBlobPathTokens(getFullBlobName()); + BlobPath blobPath = blobStoreRepository.basePath(); + for (String token : pathTokens) { + blobPath = blobPath.add(token); + } + return blobPath; + } + + protected Compressor getCompressor() { + return blobStoreRepository.getCompressor(); + } + + protected BlobStoreRepository getBlobStoreRepository() { + return this.blobStoreRepository; + } + + private static String[] extractBlobPathTokens(String blobName) { + String[] blobNameTokens = blobName.split(PATH_DELIMITER); + return Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1); + } + + private static String encodeString(String content) { + return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java b/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java new file mode 100644 index 0000000000000..15a27555f7a9b --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import java.util.List; + +/** + * Parameters which can be used to construct a blob path + * + * @opensearch.internal + */ +public class BlobPathParameters { + + private List pathTokens; + private String filePrefix; + + public BlobPathParameters(List pathTokens, String filePrefix) { + this.pathTokens = pathTokens; + this.filePrefix = filePrefix; + } + + public List getPathTokens() { + return pathTokens; + } + + public String getFilePrefix() { + return filePrefix; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java new file mode 100644 index 0000000000000..11850fa2b0c18 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.CheckedRunnable; +import org.opensearch.core.action.ActionListener; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An interface to read/write and object from/to a remote storage. This interface is agnostic of the remote storage type. + * @param The object type which can be upload to or download from remote storage. + */ +public interface RemoteObject { + public T get(); + + public String clusterUUID(); + + public InputStream serialize() throws IOException; + + public T deserialize(InputStream inputStream) throws IOException; + + public CheckedRunnable writeAsync(ActionListener listener); + + public T read() throws IOException; + + public void readAsync(ActionListener listener); + +}