From 96630f036f80708126ef4a2459577dab629220ed Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Fri, 28 Jul 2023 12:53:09 +0530 Subject: [PATCH] Add metadata prefix to Remote Translog Metadata file (#8914) Signed-off-by: Gaurav Bafna --- .../transfer/BlobStoreTransferService.java | 7 ++-- .../translog/transfer/TransferService.java | 12 +++++-- .../transfer/TranslogTransferManager.java | 8 ++++- .../transfer/TranslogTransferMetadata.java | 3 ++ .../TranslogTransferManagerTests.java | 32 +++++++++++++------ .../blobstore/BlobStoreRepositoryTests.java | 2 -- 6 files changed, 45 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 974e8af42b939..95424d86fba34 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -213,17 +213,18 @@ public void listFoldersAsync(String threadpoolName, Iterable path, Actio }); } - public void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) { - blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener); + public void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener) { + blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder(filenamePrefix, limit, LEXICOGRAPHIC, listener); } public void listAllInSortedOrderAsync( String threadpoolName, Iterable path, + String filenamePrefix, int limit, ActionListener> listener ) { - threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, limit, listener); }); + threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); }); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index a240fd38cda11..885a2e32e915d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -125,8 +125,14 @@ void uploadBlobs( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; - void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener); - - void listAllInSortedOrderAsync(String threadpoolName, Iterable path, int limit, ActionListener> listener); + void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); + + void listAllInSortedOrderAsync( + String threadpoolName, + Iterable path, + String filenamePrefix, + int limit, + ActionListener> listener + ); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index e2bb5f74df234..850a0c1a6574c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -210,7 +210,12 @@ public TranslogTransferMetadata readMetadata() throws IOException { ); try { - transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); + transferService.listAllInSortedOrder( + remoteMetadataTransferPath, + TranslogTransferMetadata.METADATA_PREFIX, + 1, + latchedActionListener + ); latch.await(); } catch (InterruptedException e) { throw new IOException("Exception while reading/downloading metadafile", e); @@ -367,6 +372,7 @@ public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) { transferService.listAllInSortedOrderAsync( ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, + TranslogTransferMetadata.METADATA_PREFIX, Integer.MAX_VALUE, new ActionListener<>() { @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 75d6549b23f1e..a8b3404d3f2ce 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -36,6 +36,8 @@ public class TranslogTransferMetadata { public static final String METADATA_SEPARATOR = "__"; + public static final String METADATA_PREFIX = "metadata"; + static final int BUFFER_SIZE = 4096; static final int CURRENT_VERSION = 1; @@ -83,6 +85,7 @@ public String getFileName() { return String.join( METADATA_SEPARATOR, Arrays.asList( + METADATA_PREFIX, RemoteStoreUtils.invertLong(primaryTerm), RemoteStoreUtils.invertLong(generation), RemoteStoreUtils.invertLong(createdAt), diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index b7091f3f4f8a6..3d622d6bdf8b8 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -44,6 +44,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -205,11 +206,12 @@ public void testReadMetadataNoFile() throws IOException { null ); doAnswer(invocation -> { - LatchedActionListener> latchedActionListener = invocation.getArgument(2); + LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService) + .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); assertNull(translogTransferManager.readMetadata()); } @@ -225,12 +227,13 @@ public void testReadMetadataSingleFile() throws IOException { TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); String mdFilename = tm.getFileName(); doAnswer(invocation -> { - LatchedActionListener> latchedActionListener = invocation.getArgument(2); + LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService) + .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenReturn( @@ -252,12 +255,13 @@ public void testReadMetadataReadException() throws IOException { String mdFilename = tm.getFileName(); doAnswer(invocation -> { - LatchedActionListener> latchedActionListener = invocation.getArgument(2); + LatchedActionListener> latchedActionListener = invocation.getArgument(3); List bmList = new LinkedList<>(); bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService) + .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); @@ -283,10 +287,11 @@ public void testReadMetadataListException() throws IOException { ); doAnswer(invocation -> { - LatchedActionListener> latchedActionListener = invocation.getArgument(2); + LatchedActionListener> latchedActionListener = invocation.getArgument(3); latchedActionListener.onFailure(new IOException("Issue while listing")); return null; - }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService) + .listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class)); when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong")); @@ -416,7 +421,7 @@ public void testDeleteStaleTranslogMetadata() { String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); String tm3 = new TranslogTransferMetadata(2, 3, 1, 2).getFileName(); doAnswer(invocation -> { - ActionListener> actionListener = invocation.getArgument(3); + ActionListener> actionListener = invocation.getArgument(4); List bmList = new LinkedList<>(); bmList.add(new PlainBlobMetadata(tm1, 1)); bmList.add(new PlainBlobMetadata(tm2, 1)); @@ -424,12 +429,19 @@ public void testDeleteStaleTranslogMetadata() { actionListener.onResponse(bmList); return null; }).when(transferService) - .listAllInSortedOrderAsync(eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), anyInt(), any(ActionListener.class)); + .listAllInSortedOrderAsync( + eq(ThreadPool.Names.REMOTE_PURGE), + any(BlobPath.class), + anyString(), + anyInt(), + any(ActionListener.class) + ); List files = List.of(tm2, tm3); translogTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> { verify(transferService).listAllInSortedOrderAsync( eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), + eq(TranslogTransferMetadata.METADATA_PREFIX), eq(Integer.MAX_VALUE), any() ); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 28513f279f8ad..9d711d464754c 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -32,7 +32,6 @@ package org.opensearch.repositories.blobstore; -import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.Version; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; @@ -91,7 +90,6 @@ /** * Tests for the {@link BlobStoreRepository} and its subclasses. */ -@LuceneTestCase.SuppressFileSystems("ExtrasFS") public class BlobStoreRepositoryTests extends OpenSearchSingleNodeTestCase { static final String REPO_TYPE = "fsLike";