From b54ab0dcda3b35a3738012be3549897270613e74 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 4 Oct 2024 20:44:13 +0300 Subject: [PATCH] review fixes --- .../IncrementalSnapshotCheckResult.java | 69 +++++++++++++++ .../snapshot/SnapshotCheckProcess.java | 10 +-- .../persistence/snapshot/SnapshotChecker.java | 83 ++----------------- 3 files changed, 83 insertions(+), 79 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java new file mode 100644 index 0000000000000..661c5f0492888 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotCheckResult.java @@ -0,0 +1,69 @@ +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.internal.management.cache.PartitionKeyV2; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; +import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** */ +class IncrementalSnapshotCheckResult implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Transaction hashes collection. */ + private Map txHashRes; + + /** + * Partition hashes collection. Value is a hash of data entries {@link DataEntry} from WAL segments included + * into the incremental snapshot. + */ + private Map partHashRes; + + /** Partially committed transactions' collection. */ + private Collection partiallyCommittedTxs; + + /** Occurred exceptions. */ + private Collection exceptions; + + /** */ + public IncrementalSnapshotCheckResult() { + // No-op. + } + + /** */ + IncrementalSnapshotCheckResult( + Map txHashRes, + Map partHashRes, + Collection partiallyCommittedTxs, + Collection exceptions + ) { + this.txHashRes = txHashRes; + this.partHashRes = partHashRes; + this.partiallyCommittedTxs = partiallyCommittedTxs; + this.exceptions = exceptions; + } + + /** */ + public Map partHashRes() { + return partHashRes; + } + + /** */ + public Map txHashRes() { + return txHashRes; + } + + /** */ + public Collection partiallyCommittedTxs() { + return partiallyCommittedTxs; + } + + /** */ + public Collection exceptions() { + return exceptions; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java index 4eac625657a7b..e9f64247e82d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java @@ -61,7 +61,7 @@ public class SnapshotCheckProcess { /** */ private final GridKernalContext kctx; - /** Operation contexts by unique operation name/id. */ + /** Operation contexts by name. */ private final Map contexts = new ConcurrentHashMap<>(); /** Cluster-wide operation futures per snapshot called from current node. */ @@ -229,7 +229,7 @@ private IgniteInternalFuture validateParts(SnapshotCheckP phaseFut.onDone(err); else { if (req.incrementalIndex() > 0) - phaseFut.onDone(new SnapshotCheckResponse((SnapshotChecker.IncrementalSnapshotResult)res)); + phaseFut.onDone(new SnapshotCheckResponse((IncrementalSnapshotCheckResult)res)); else phaseFut.onDone(new SnapshotCheckResponse((Map)res)); } @@ -485,7 +485,7 @@ private static final class SnapshotCheckResponse implements Serializable { @Nullable private final Map partsResults; /** @see #incrementalResult() */ - @Nullable private final SnapshotChecker.IncrementalSnapshotResult incRes; + @Nullable private final IncrementalSnapshotCheckResult incRes; /** Ctor for the phase 1. */ private SnapshotCheckResponse(@Nullable List metas) { @@ -502,7 +502,7 @@ private SnapshotCheckResponse(Map partsResults) { } /** Ctor for the phase 2 for incremental snapshot. */ - private SnapshotCheckResponse(SnapshotChecker.IncrementalSnapshotResult incRes) { + private SnapshotCheckResponse(IncrementalSnapshotCheckResult incRes) { this.metas = null; this.partsResults = null; this.incRes = incRes; @@ -532,7 +532,7 @@ private SnapshotCheckResponse(SnapshotChecker.IncrementalSnapshotResult incRes) } /** Incremental snapshot result for the phase 2. Is always {@code null} for the phase 1 or in case of normal snapshot. */ - private @Nullable SnapshotChecker.IncrementalSnapshotResult incrementalResult() { + private @Nullable IncrementalSnapshotCheckResult incrementalResult() { return incRes; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java index bc0041ab76c09..9c1d02c98c075 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.file.DirectoryStream; @@ -58,7 +57,6 @@ import org.apache.ignite.internal.managers.encryption.GroupKey; import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; import org.apache.ignite.internal.pagemem.store.PageStore; -import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridLocalConfigManager; @@ -142,7 +140,7 @@ public class SnapshotChecker { public SnapshotChecker( GridKernalContext kctx, Marshaller marshaller, - ExecutorService executor, + ExecutorService executorSrvc, @Nullable ClassLoader marshallerClsLdr ) { this.kctx = kctx; @@ -152,7 +150,7 @@ public SnapshotChecker( this.encryptionSpi = kctx.config().getEncryptionSpi() == null ? new NoopEncryptionSpi() : kctx.config().getEncryptionSpi(); - this.executor = executor; + this.executor = executorSrvc; this.log = kctx.log(getClass()); } @@ -313,11 +311,7 @@ public CompletableFuture> checkLocalMetas( } /** Checks that all incremental snapshots are present, contain correct metafile and WAL segments. */ - private void checkIncrementalSnapshotsExist( - SnapshotMetadata fullMeta, - File snpDir, - int incIdx - ) { + private void checkIncrementalSnapshotsExist(SnapshotMetadata fullMeta, File snpDir, int incIdx) { try { // Incremental snapshot must contain ClusterSnapshotRecord. long startSeg = fullMeta.snapshotRecordPointer().index(); @@ -393,7 +387,7 @@ private void checkWalSegments(IncrementalSnapshotMetadata meta, long startWalSeg } /** */ - public CompletableFuture checkIncrementalSnapshot( + public CompletableFuture checkIncrementalSnapshot( String snpName, @Nullable String snpPath, int incIdx @@ -592,7 +586,7 @@ else if (txRec.state() == TransactionState.ROLLED_BACK) { ", walSegments=" + procSegCnt.get() + ']'); } - return new IncrementalSnapshotResult( + return new IncrementalSnapshotCheckResult( txHashRes, partHashRes, partiallyCommittedTxs, @@ -622,7 +616,7 @@ private Map readTxCachesData(File snpDir) throws Ignit /** */ public IdleVerifyResultV2 reduceIncrementalResults( - Map results, + Map results, Map operationErrors ) { if (!operationErrors.isEmpty()) @@ -635,7 +629,7 @@ public IdleVerifyResultV2 reduceIncrementalResults( Map errors = new HashMap<>(); results.forEach((node, res) -> { - if (res.exceptions.isEmpty() && errors.isEmpty()) { + if (res.exceptions().isEmpty() && errors.isEmpty()) { if (!F.isEmpty(res.partiallyCommittedTxs())) partiallyCommittedTxs.put(node, res.partiallyCommittedTxs()); @@ -665,8 +659,8 @@ public IdleVerifyResultV2 reduceIncrementalResults( } } } - else if (!res.exceptions.isEmpty()) - errors.put(node, F.first(res.exceptions)); + else if (!res.exceptions().isEmpty()) + errors.put(node, F.first(res.exceptions())); }); // Add all missed pairs to conflicts. @@ -1283,65 +1277,6 @@ private SnapshotEncryptionKeyProvider(GridKernalContext ctx, Map } } - /** */ - public static class IncrementalSnapshotResult implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Transaction hashes collection. */ - private Map txHashRes; - - /** - * Partition hashes collection. Value is a hash of data entries {@link DataEntry} from WAL segments included - * into the incremental snapshot. - */ - private Map partHashRes; - - /** Partially committed transactions' collection. */ - private Collection partiallyCommittedTxs; - - /** Occurred exceptions. */ - private Collection exceptions; - - /** */ - public IncrementalSnapshotResult() { - // No-op. - } - - /** */ - private IncrementalSnapshotResult( - Map txHashRes, - Map partHashRes, - Collection partiallyCommittedTxs, - Collection exceptions - ) { - this.txHashRes = txHashRes; - this.partHashRes = partHashRes; - this.partiallyCommittedTxs = partiallyCommittedTxs; - this.exceptions = exceptions; - } - - /** */ - public Map partHashRes() { - return partHashRes; - } - - /** */ - public Map txHashRes() { - return txHashRes; - } - - /** */ - public Collection partiallyCommittedTxs() { - return partiallyCommittedTxs; - } - - /** */ - public Collection exceptions() { - return exceptions; - } - } - /** Holder for calculated hashes. */ private static class HashHolder { /** */