From 0bdc6afd9801494612b79d6a4329b43d2b687557 Mon Sep 17 00:00:00 2001 From: prayascoriolis Date: Thu, 14 Nov 2024 17:01:58 +0530 Subject: [PATCH] Check the latest state of uploaded data & add unit tests. Signed-off-by: Prayas Kumar --- .../opensearch/index/shard/IndexShard.java | 13 ++++- .../index/shard/IndexShardTests.java | 47 ++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1e36e1f405135..2e32beeb36482 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5171,7 +5171,18 @@ public void syncSegmentsFromGivenRemoteSegmentStore( Map uploadedSegments = sourceRemoteDirectory .getSegmentsUploadedToRemoteStore(); // Checking for existing remote segments in remote - boolean hasPreexistingRemoteData = remoteDirectory != null && !remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty(); + boolean hasPreexistingRemoteData = false; + if (remoteDirectory != null && !remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { + Map existingSegments = + remoteDirectory.getSegmentsUploadedToRemoteStore(); + // Compare metadata to ensure we have latest segments + hasPreexistingRemoteData = uploadedSegments.entrySet().stream().allMatch(entry -> { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata existing = existingSegments.get(entry.getKey()); + return existing != null && + existing.getChecksum().equals(entry.getValue().getChecksum()) && + existing.getLength() >= entry.getValue().getLength(); + }); + } store.incRef(); try { final Directory storeDirectory; diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 96794a83ef762..e7893447e18f5 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2886,8 +2886,8 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException { Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__test1") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__test1") + .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__source") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__source") .build(), new InternalEngineFactory() ); @@ -2914,6 +2914,49 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException { closeShards(target); } + public void testSyncSegmentsBetweenSnapshotAndRemoteStore() throws IOException { + String remoteStorePath = createTempDir().toString(); + // Create source shard and take snapshot + IndexShard source = newStartedShard( + true, + Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__snapshot") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__snapshot") + .build(), + new InternalEngineFactory() + ); + // Index documents and create snapshot + indexDoc(source, "_doc", "1"); + indexDoc(source, "_doc", "2"); + source.refresh("test"); + assertTrue("At lease one remote sync should have been completed", source.isRemoteSegmentStoreInSync()); + assertDocs(source, "1", "2"); + Collection snapshotSegments = SegmentInfos.readLatestCommit(source.store().directory()).files(false); + // Create remote store directory from snapshot + RemoteSegmentStoreDirectory snapshotRemoteDir = createRemoteSegmentStoreDirectory( + source.shardId(), + PathUtils.get(remoteStorePath) + ); + // Get source remote store directory with uploaded segment + source.syncSegmentsFromGivenRemoteSegmentStore(false, snapshotRemoteDir, null, false); + // Verify first restore uploads segments + Map firstRestoreFiles = snapshotRemoteDir.getSegmentsUploadedToRemoteStore(); + assertTrue("All segments should be uploaded", firstRestoreFiles.keySet().containsAll(snapshotSegments)); + // Verify second restore finds existing segments + Map secondRestoreFiles = + snapshotRemoteDir.getSegmentsUploadedToRemoteStore(); + assertTrue("Second restore should not upload segments", secondRestoreFiles.isEmpty()); + closeShards(source); + } + + private RemoteSegmentStoreDirectory getRemoteStoreDirectory(IndexShard shard) { + return ((RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) shard + .remoteStore() + .directory()).getDelegate()).getDelegate()); + } + public void testRefreshLevelRestoreShardFromRemoteStore() throws IOException { testRestoreShardFromRemoteStore(false); }