Skip to content

Commit

Permalink
Check the latest state of uploaded data & add unit tests. Signed-off-…
Browse files Browse the repository at this point in the history
…by: Prayas Kumar <[email protected]>
  • Loading branch information
prayascoriolis committed Nov 14, 2024
1 parent a55ff72 commit 0bdc6af
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5171,7 +5171,18 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.getSegmentsUploadedToRemoteStore();
// Checking for existing remote segments in remote
boolean hasPreexistingRemoteData = remoteDirectory != null && !remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty();
boolean hasPreexistingRemoteData = false;
if (remoteDirectory != null && !remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> existingSegments =
remoteDirectory.getSegmentsUploadedToRemoteStore();
// Compare metadata to ensure we have latest segments
hasPreexistingRemoteData = uploadedSegments.entrySet().stream().allMatch(entry -> {
RemoteSegmentStoreDirectory.UploadedSegmentMetadata existing = existingSegments.get(entry.getKey());
return existing != null &&
existing.getChecksum().equals(entry.getValue().getChecksum()) &&
existing.getLength() >= entry.getValue().getLength();
});
}
store.incRef();
try {
final Directory storeDirectory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2886,8 +2886,8 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException {
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__test1")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__test1")
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__source")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__source")
.build(),
new InternalEngineFactory()
);
Expand All @@ -2914,6 +2914,49 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException {
closeShards(target);
}

public void testSyncSegmentsBetweenSnapshotAndRemoteStore() throws IOException {
String remoteStorePath = createTempDir().toString();
// Create source shard and take snapshot
IndexShard source = newStartedShard(
true,
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__snapshot")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__snapshot")
.build(),
new InternalEngineFactory()
);
// Index documents and create snapshot
indexDoc(source, "_doc", "1");
indexDoc(source, "_doc", "2");
source.refresh("test");
assertTrue("At lease one remote sync should have been completed", source.isRemoteSegmentStoreInSync());
assertDocs(source, "1", "2");
Collection<String> snapshotSegments = SegmentInfos.readLatestCommit(source.store().directory()).files(false);
// Create remote store directory from snapshot
RemoteSegmentStoreDirectory snapshotRemoteDir = createRemoteSegmentStoreDirectory(
source.shardId(),
PathUtils.get(remoteStorePath)
);
// Get source remote store directory with uploaded segment
source.syncSegmentsFromGivenRemoteSegmentStore(false, snapshotRemoteDir, null, false);
// Verify first restore uploads segments
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> firstRestoreFiles = snapshotRemoteDir.getSegmentsUploadedToRemoteStore();
assertTrue("All segments should be uploaded", firstRestoreFiles.keySet().containsAll(snapshotSegments));
// Verify second restore finds existing segments
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> secondRestoreFiles =
snapshotRemoteDir.getSegmentsUploadedToRemoteStore();
assertTrue("Second restore should not upload segments", secondRestoreFiles.isEmpty());
closeShards(source);
}

private RemoteSegmentStoreDirectory getRemoteStoreDirectory(IndexShard shard) {
return ((RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) shard
.remoteStore()
.directory()).getDelegate()).getDelegate());
}

public void testRefreshLevelRestoreShardFromRemoteStore() throws IOException {
testRestoreShardFromRemoteStore(false);
}
Expand Down

0 comments on commit 0bdc6af

Please sign in to comment.