Skip to content

Commit

Permalink
Add unit tests for uploadBlobAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Jun 5, 2024
1 parent 148eca5 commit f47c648
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void writeAsync(final U entity, final ActionListener<Void> listener) {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);
transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, listener);
transferService.uploadBlobAsync(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, listener);
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.index.translog.ChannelFactory;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand Down Expand Up @@ -63,7 +61,7 @@ public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) {
}

@Override
public void uploadBlob(
public void uploadBlobAsync(
String threadPoolName,
final TransferFileSnapshot fileSnapshot,
Iterable<String> remoteTransferPath,
Expand All @@ -74,7 +72,7 @@ public void uploadBlob(
BlobPath blobPath = (BlobPath) remoteTransferPath;
threadPool.executor(threadPoolName).execute(ActionRunnable.wrap(listener, l -> {
try {
uploadBlob(fileSnapshot, blobPath, writePriority);
uploadBlobAsync(fileSnapshot, blobPath, writePriority);
l.onResponse(fileSnapshot);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
Expand All @@ -84,7 +82,7 @@ public void uploadBlob(
}

@Override
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath, WritePriority writePriority)
public void uploadBlobAsync(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath, WritePriority writePriority)
throws IOException {
BlobPath blobPath = (BlobPath) remoteTransferPath;
try (InputStream inputStream = fileSnapshot.inputStream()) {
Expand All @@ -102,16 +100,16 @@ public void uploadBlobs(
fileSnapshots.forEach(fileSnapshot -> {
BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm());
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
uploadBlobAsync(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
} else {
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
uploadBlobAsync(fileSnapshot, listener, blobPath, writePriority);
}
});

}

@Override
public void uploadBlob(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException {
public void uploadBlobAsync(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException {
assert remotePath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remotePath;
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
Expand All @@ -135,24 +133,19 @@ public void uploadBlob(InputStream inputStream, Iterable<String> remotePath, Str
);
}

try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length,
true,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}
asyncBlobUpload(blobName,
blobName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
listener
);
}
}

private void uploadBlob(
private void uploadBlobAsync(
TransferFileSnapshot fileSnapshot,
ActionListener<TransferFileSnapshot> listener,
BlobPath blobPath,
Expand All @@ -165,36 +158,21 @@ private void uploadBlob(
try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) {
contentLength = channel.size();
}
boolean remoteIntegrityEnabled = false;
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
}
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileSnapshot.getName(),
fileSnapshot.getName(),
contentLength,
true,
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
Objects.requireNonNull(fileSnapshot.getChecksum()),
remoteIntegrityEnabled
);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
listener.onFailure(new FileTransferException(fileSnapshot, ex));
});

completionListener = ActionListener.runBefore(completionListener, () -> {
try {
remoteTransferContainer.close();
} catch (Exception e) {
logger.warn("Error occurred while closing streams", e);
}
});

WriteContext writeContext = remoteTransferContainer.createWriteContext();
((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener);
Objects.requireNonNull(fileSnapshot.getChecksum());
asyncBlobUpload(fileSnapshot.getName(),
fileSnapshot.getName(),
contentLength,
blobPath,
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
fileSnapshot.getChecksum(),
completionListener
);

} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
Expand All @@ -209,6 +187,24 @@ private void uploadBlob(

}

private void asyncBlobUpload(String fileName, String remoteFileName, long contentLength, BlobPath blobPath, WritePriority writePriority, RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier, long expectedChecksum, ActionListener<Void> completionListener) throws IOException {
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
assert blobContainer instanceof AsyncMultiStreamBlobContainer;
boolean remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
try (RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
remoteFileName,
contentLength,
true,
writePriority,
inputStreamSupplier,
expectedChecksum,
remoteIntegrityEnabled
)) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener);
}
}

@Override
public InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException {
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.IOException;
Expand All @@ -37,7 +36,7 @@ public interface TransferService {
* @param remotePath the remote path where upload should be made
* @param listener the callback to be invoked once upload completes successfully/fails
*/
void uploadBlob(
void uploadBlobAsync(
String threadPoolName,
final TransferFileSnapshot fileSnapshot,
Iterable<String> remotePath,
Expand Down Expand Up @@ -65,8 +64,8 @@ void uploadBlobs(
* @param writePriority Priority by which content needs to be written.
* @throws IOException the exception while transferring the data
*/
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath, WritePriority writePriority) throws IOException;
void uploadBlob(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException;
void uploadBlobAsync(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath, WritePriority writePriority) throws IOException;
void uploadBlobAsync(InputStream inputStream, Iterable<String> remotePath, String blobName, WritePriority writePriority, ActionListener<Void> listener) throws IOException;

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
remoteTranslogTransferTracker.addUploadBytesStarted(metadataBytesToUpload);
metadataUploadStartTime = System.nanoTime();
try {
transferService.uploadBlob(tlogMetadata, remoteMetadataTransferPath, WritePriority.HIGH);
transferService.uploadBlobAsync(tlogMetadata, remoteMetadataTransferPath, WritePriority.HIGH);
} catch (Exception exception) {
remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L);
remoteTranslogTransferTracker.addUploadBytesFailed(metadataBytesToUpload);
Expand Down
Loading

0 comments on commit f47c648

Please sign in to comment.