diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 2e25a532b5abf..402f59b5499a6 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -77,6 +77,19 @@ public interface BlobContainer { */ InputStream readBlob(String blobName) throws IOException; + /** + * Creates a new {@link BlobDownloadResponse} for the given blob name. + * + * @param blobName + * The name of the blob to get an {@link InputStream} for. + * @return The {@code InputStream} to read the blob. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob can not be read. + */ + default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException{ + return null; + }; + /** * Creates a new {@link InputStream} that can be used to read the given blob starting from * a specific {@code position} in the blob. The {@code length} is an indication of the @@ -128,6 +141,27 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata. + * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobWithMetadata(String blobName, InputStream inputStream, Map metadata, long blobSize, boolean failIfAlreadyExists) throws IOException {}; + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. @@ -149,6 +183,29 @@ default long readBlobPreferredLength() { */ void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata + * using an atomic write operation if the implementation supports it. + *

+ * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobAtomicWithMetadata(String blobName, InputStream inputStream, Map metadata, long blobSize, boolean failIfAlreadyExists) throws IOException {}; + /** * Deletes this container and all its contents from the repository. * diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java new file mode 100644 index 0000000000000..70b3e3ec76a5e --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore; + +import java.io.InputStream; +import java.util.Map; + +/** + * A class for blob download response + * + * @opensearch.internal + */ +public class BlobDownloadResponse { + + /** + * Downloaded blob InputStream + */ + private InputStream inputStream; + + /** + * Metadata of the downloaded blob + */ + private Map metadata; + + public InputStream getInputStream() { + return inputStream; + } + + public Map getMetadata() { + return metadata; + } + + public BlobDownloadResponse(InputStream inputStream, Map metadata){ + this.inputStream = inputStream; + this.metadata = metadata; + } + + +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 82dd6301ef79f..0e582d5739efa 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -17,6 +17,7 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; @@ -164,6 +165,11 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I return blobStore.blobContainer((BlobPath) path).readBlob(fileName); } + @Override + public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); + } + @Override public void deleteBlobs(Iterable path, List fileNames) throws IOException { blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index cfe833dde87eb..a11532dc69f1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -125,6 +126,15 @@ void uploadBlobs( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; + /** + * + * @param path the remote path from where download should be made + * @param fileName the name of the file + * @return {@link BlobDownloadResponse} of the remote file + * @throws IOException the exception while reading the data + */ + BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; + void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); void listAllInSortedOrderAsync(