From 4a0feeef41fd0efee342d31e5e935c31af332cf9 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Fri, 15 Mar 2024 11:29:33 +0530 Subject: [PATCH] handle unexpected exception on success callback of translog upload (#12577) * handle unexpected exception on success callback of translog upload Signed-off-by: Varun Bansal --- .../transfer/FileTransferTracker.java | 15 ++++++++-- .../transfer/FileTransferTrackerTests.java | 30 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 9c2304f809f46..f3c17cdaa0054 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -8,6 +8,8 @@ package org.opensearch.index.translog.transfer; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -33,11 +35,13 @@ public class FileTransferTracker implements FileTransferListener { private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private Map bytesForTlogCkpFileToUpload; private long fileTransferStartTime = -1; + private final Logger logger; public FileTransferTracker(ShardId shardId, RemoteTranslogTransferTracker remoteTranslogTransferTracker) { this.shardId = shardId; this.fileTransferTracker = new ConcurrentHashMap<>(); this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; + this.logger = Loggers.getLogger(getClass(), shardId); } void recordFileTransferStartTime(long uploadStartTime) { @@ -64,9 +68,14 @@ long getTotalBytesToUpload() { @Override public void onSuccess(TransferFileSnapshot fileSnapshot) { - long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L; - remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis); - remoteTranslogTransferTracker.addUploadBytesSucceeded(bytesForTlogCkpFileToUpload.get(fileSnapshot.getName())); + try { + long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L; + remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis); + remoteTranslogTransferTracker.addUploadBytesSucceeded(bytesForTlogCkpFileToUpload.get(fileSnapshot.getName())); + } catch (Exception ex) { + logger.error("Failure to update translog upload success stats", ex); + } + add(fileSnapshot.getName(), TransferState.SUCCESS); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java index b96ada1f6bbff..9665b8e6fd646 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java @@ -20,6 +20,10 @@ import java.util.List; import java.util.Set; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + public class FileTransferTrackerTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); @@ -94,6 +98,32 @@ public void testOnFailure() throws IOException { } } + public void testOnSuccessStatsFailure() throws IOException { + RemoteTranslogTransferTracker localRemoteTranslogTransferTracker = spy(remoteTranslogTransferTracker); + doAnswer((count) -> { throw new NullPointerException("Error while updating stats"); }).when(localRemoteTranslogTransferTracker) + .addUploadBytesSucceeded(anyLong()); + + FileTransferTracker localFileTransferTracker = new FileTransferTracker(shardId, localRemoteTranslogTransferTracker); + + Path testFile = createTempFile(); + int fileSize = 128; + Files.write(testFile, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND); + try ( + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong(), + null + ); + ) { + Set toUpload = new HashSet<>(2); + toUpload.add(transferFileSnapshot); + localFileTransferTracker.recordBytesForFiles(toUpload); + localRemoteTranslogTransferTracker.addUploadBytesStarted(fileSize); + localFileTransferTracker.onSuccess(transferFileSnapshot); + assertEquals(localFileTransferTracker.allUploaded().size(), 1); + } + } + public void testUploaded() throws IOException { Path testFile = createTempFile(); int fileSize = 128;