Skip to content

Commit

Permalink
Add metadata prefix to Remote Translog Metadata file (#8914)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Jul 28, 2023
1 parent c43743d commit 96630f0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,18 @@ public void listFoldersAsync(String threadpoolName, Iterable<String> path, Actio
});
}

public void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) {
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener);
public void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener) {
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder(filenamePrefix, limit, LEXICOGRAPHIC, listener);
}

public void listAllInSortedOrderAsync(
String threadpoolName,
Iterable<String> path,
String filenamePrefix,
int limit,
ActionListener<List<BlobMetadata>> listener
) {
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, limit, listener); });
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); });
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,14 @@ void uploadBlobs(
*/
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;

void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);

void listAllInSortedOrderAsync(String threadpoolName, Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);
void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener);

void listAllInSortedOrderAsync(
String threadpoolName,
Iterable<String> path,
String filenamePrefix,
int limit,
ActionListener<List<BlobMetadata>> listener
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,12 @@ public TranslogTransferMetadata readMetadata() throws IOException {
);

try {
transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener);
transferService.listAllInSortedOrder(
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
1,
latchedActionListener
);
latch.await();
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
Expand Down Expand Up @@ -367,6 +372,7 @@ public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class TranslogTransferMetadata {

public static final String METADATA_SEPARATOR = "__";

public static final String METADATA_PREFIX = "metadata";

static final int BUFFER_SIZE = 4096;

static final int CURRENT_VERSION = 1;
Expand Down Expand Up @@ -83,6 +85,7 @@ public String getFileName() {
return String.join(
METADATA_SEPARATOR,
Arrays.asList(
METADATA_PREFIX,
RemoteStoreUtils.invertLong(primaryTerm),
RemoteStoreUtils.invertLong(generation),
RemoteStoreUtils.invertLong(createdAt),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -205,11 +206,12 @@ public void testReadMetadataNoFile() throws IOException {
null
);
doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
List<BlobMetadata> bmList = new LinkedList<>();
latchedActionListener.onResponse(bmList);
return null;
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
}).when(transferService)
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));

assertNull(translogTransferManager.readMetadata());
}
Expand All @@ -225,12 +227,13 @@ public void testReadMetadataSingleFile() throws IOException {
TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2);
String mdFilename = tm.getFileName();
doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
List<BlobMetadata> bmList = new LinkedList<>();
bmList.add(new PlainBlobMetadata(mdFilename, 1));
latchedActionListener.onResponse(bmList);
return null;
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
}).when(transferService)
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));

TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenReturn(
Expand All @@ -252,12 +255,13 @@ public void testReadMetadataReadException() throws IOException {
String mdFilename = tm.getFileName();

doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
List<BlobMetadata> bmList = new LinkedList<>();
bmList.add(new PlainBlobMetadata(mdFilename, 1));
latchedActionListener.onResponse(bmList);
return null;
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
}).when(transferService)
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));

when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong"));

Expand All @@ -283,10 +287,11 @@ public void testReadMetadataListException() throws IOException {
);

doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
latchedActionListener.onFailure(new IOException("Issue while listing"));
return null;
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
}).when(transferService)
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));

when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong"));

Expand Down Expand Up @@ -416,20 +421,27 @@ public void testDeleteStaleTranslogMetadata() {
String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName();
String tm3 = new TranslogTransferMetadata(2, 3, 1, 2).getFileName();
doAnswer(invocation -> {
ActionListener<List<BlobMetadata>> actionListener = invocation.getArgument(3);
ActionListener<List<BlobMetadata>> actionListener = invocation.getArgument(4);
List<BlobMetadata> bmList = new LinkedList<>();
bmList.add(new PlainBlobMetadata(tm1, 1));
bmList.add(new PlainBlobMetadata(tm2, 1));
bmList.add(new PlainBlobMetadata(tm3, 1));
actionListener.onResponse(bmList);
return null;
}).when(transferService)
.listAllInSortedOrderAsync(eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), anyInt(), any(ActionListener.class));
.listAllInSortedOrderAsync(
eq(ThreadPool.Names.REMOTE_PURGE),
any(BlobPath.class),
anyString(),
anyInt(),
any(ActionListener.class)
);
List<String> files = List.of(tm2, tm3);
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> {
verify(transferService).listAllInSortedOrderAsync(
eq(ThreadPool.Names.REMOTE_PURGE),
any(BlobPath.class),
eq(TranslogTransferMetadata.METADATA_PREFIX),
eq(Integer.MAX_VALUE),
any()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.repositories.blobstore;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
Expand Down Expand Up @@ -91,7 +90,6 @@
/**
* Tests for the {@link BlobStoreRepository} and its subclasses.
*/
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
public class BlobStoreRepositoryTests extends OpenSearchSingleNodeTestCase {

static final String REPO_TYPE = "fsLike";
Expand Down

0 comments on commit 96630f0

Please sign in to comment.