Skip to content

Commit

Permalink
Remote state interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed May 23, 2024
1 parent 66df930 commit e64941a
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.concurrent.ExecutorService;

/**
* An extension of {@link RemoteObject} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractRemoteBlobStoreObject<T> implements RemoteObject<T> {

Check warning on line 31 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L31

Added line #L31 was not covered by tests

public static final String PATH_DELIMITER = "/";

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;

public AbstractRemoteBlobStoreObject(
BlobStoreTransferService blobStoreTransferService,
BlobStoreRepository blobStoreRepository,
String clusterName,
ThreadPool threadPool
) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(ThreadPool.Names.GENERIC);
}

Check warning on line 50 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L45-L50

Added lines #L45 - L50 were not covered by tests

public abstract BlobPathParameters getBlobPathParameters();

public abstract String getFullBlobName();

public String getBlobFileName() {
if (getFullBlobName() == null) {
generateBlobFileName();

Check warning on line 58 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L58

Added line #L58 was not covered by tests
}
String[] pathTokens = getFullBlobName().split(PATH_DELIMITER);
return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1];

Check warning on line 61 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L60-L61

Added lines #L60 - L61 were not covered by tests
}

public abstract String generateBlobFileName();

public abstract UploadedMetadata getUploadedMetadata();

@Override
public CheckedRunnable<IOException> writeAsync(ActionListener<Void> listener) {
return () -> {

Check warning on line 70 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L70

Added line #L70 was not covered by tests
assert get() != null;
// TODO add implementation
};

Check warning on line 73 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L73

Added line #L73 was not covered by tests
}

@Override
public T read() throws IOException {
assert getFullBlobName() != null;
return deserialize(transferService.downloadBlob(getBlobPathForDownload(), getBlobFileName()));

Check warning on line 79 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L79

Added line #L79 was not covered by tests
}

@Override
public void readAsync(ActionListener<T> listener) {
executorService.execute(() -> {

Check warning on line 84 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L84

Added line #L84 was not covered by tests
try {
listener.onResponse(read());
} catch (Exception e) {
listener.onFailure(e);
}
});
}

Check warning on line 91 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L86-L91

Added lines #L86 - L91 were not covered by tests

public BlobPath getBlobPathForUpload() {
BlobPath blobPath = blobStoreRepository.basePath().add(encodeString(clusterName)).add("cluster-state").add(clusterUUID());

Check warning on line 94 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L94

Added line #L94 was not covered by tests
for (String token : getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;

Check warning on line 98 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L96-L98

Added lines #L96 - L98 were not covered by tests
}

public BlobPath getBlobPathForDownload() {
String[] pathTokens = extractBlobPathTokens(getFullBlobName());
BlobPath blobPath = blobStoreRepository.basePath();

Check warning on line 103 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L102-L103

Added lines #L102 - L103 were not covered by tests
for (String token : pathTokens) {
blobPath = blobPath.add(token);

Check warning on line 105 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L105

Added line #L105 was not covered by tests
}
return blobPath;

Check warning on line 107 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L107

Added line #L107 was not covered by tests
}

protected Compressor getCompressor() {
return blobStoreRepository.getCompressor();

Check warning on line 111 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L111

Added line #L111 was not covered by tests
}

protected BlobStoreRepository getBlobStoreRepository() {
return this.blobStoreRepository;

Check warning on line 115 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L115

Added line #L115 was not covered by tests
}

private static String[] extractBlobPathTokens(String blobName) {
String[] blobNameTokens = blobName.split(PATH_DELIMITER);
return Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1);

Check warning on line 120 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L119-L120

Added lines #L119 - L120 were not covered by tests
}

private static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));

Check warning on line 124 in server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java#L124

Added line #L124 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.util.List;

/**
* Parameters which can be used to construct a blob path
*
* @opensearch.internal
*/
public class BlobPathParameters {

private List<String> pathTokens;
private String filePrefix;

public BlobPathParameters(List<String> pathTokens, String filePrefix) {
this.pathTokens = pathTokens;
this.filePrefix = filePrefix;
}

Check warning on line 26 in server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java#L23-L26

Added lines #L23 - L26 were not covered by tests

public List<String> getPathTokens() {
return pathTokens;

Check warning on line 29 in server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java#L29

Added line #L29 was not covered by tests
}

public String getFilePrefix() {
return filePrefix;

Check warning on line 33 in server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java#L33

Added line #L33 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;

/**
* An interface to read/write and object from/to a remote storage. This interface is agnostic of the remote storage type.
* @param <T> The object type which can be upload to or download from remote storage.
*/
public interface RemoteObject<T> {
public T get();

public String clusterUUID();

public InputStream serialize() throws IOException;

public T deserialize(InputStream inputStream) throws IOException;

public CheckedRunnable<IOException> writeAsync(ActionListener<Void> listener);

public T read() throws IOException;

public void readAsync(ActionListener<T> listener);

}

0 comments on commit e64941a

Please sign in to comment.