Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladsz83 committed Oct 4, 2024
1 parent 375420b commit b54ab0d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;

/** */
class IncrementalSnapshotCheckResult implements Serializable {
/** */
private static final long serialVersionUID = 0L;

/** Transaction hashes collection. */
private Map<Object, TransactionsHashRecord> txHashRes;

/**
* Partition hashes collection. Value is a hash of data entries {@link DataEntry} from WAL segments included
* into the incremental snapshot.
*/
private Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes;

/** Partially committed transactions' collection. */
private Collection<GridCacheVersion> partiallyCommittedTxs;

/** Occurred exceptions. */
private Collection<Exception> exceptions;

/** */
public IncrementalSnapshotCheckResult() {
// No-op.
}

/** */
IncrementalSnapshotCheckResult(
Map<Object, TransactionsHashRecord> txHashRes,
Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes,
Collection<GridCacheVersion> partiallyCommittedTxs,
Collection<Exception> exceptions
) {
this.txHashRes = txHashRes;
this.partHashRes = partHashRes;
this.partiallyCommittedTxs = partiallyCommittedTxs;
this.exceptions = exceptions;
}

/** */
public Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes() {
return partHashRes;
}

/** */
public Map<Object, TransactionsHashRecord> txHashRes() {
return txHashRes;
}

/** */
public Collection<GridCacheVersion> partiallyCommittedTxs() {
return partiallyCommittedTxs;
}

/** */
public Collection<Exception> exceptions() {
return exceptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class SnapshotCheckProcess {
/** */
private final GridKernalContext kctx;

/** Operation contexts by unique operation name/id. */
/** Operation contexts by name. */
private final Map<String, SnapshotCheckContext> contexts = new ConcurrentHashMap<>();

/** Cluster-wide operation futures per snapshot called from current node. */
Expand Down Expand Up @@ -229,7 +229,7 @@ private IgniteInternalFuture<SnapshotCheckResponse> validateParts(SnapshotCheckP
phaseFut.onDone(err);
else {
if (req.incrementalIndex() > 0)
phaseFut.onDone(new SnapshotCheckResponse((SnapshotChecker.IncrementalSnapshotResult)res));
phaseFut.onDone(new SnapshotCheckResponse((IncrementalSnapshotCheckResult)res));
else
phaseFut.onDone(new SnapshotCheckResponse((Map<?, ?>)res));
}
Expand Down Expand Up @@ -485,7 +485,7 @@ private static final class SnapshotCheckResponse implements Serializable {
@Nullable private final Map<?, ?> partsResults;

/** @see #incrementalResult() */
@Nullable private final SnapshotChecker.IncrementalSnapshotResult incRes;
@Nullable private final IncrementalSnapshotCheckResult incRes;

/** Ctor for the phase 1. */
private SnapshotCheckResponse(@Nullable List<SnapshotMetadata> metas) {
Expand All @@ -502,7 +502,7 @@ private SnapshotCheckResponse(Map<?, ?> partsResults) {
}

/** Ctor for the phase 2 for incremental snapshot. */
private SnapshotCheckResponse(SnapshotChecker.IncrementalSnapshotResult incRes) {
private SnapshotCheckResponse(IncrementalSnapshotCheckResult incRes) {
this.metas = null;
this.partsResults = null;
this.incRes = incRes;
Expand Down Expand Up @@ -532,7 +532,7 @@ private SnapshotCheckResponse(SnapshotChecker.IncrementalSnapshotResult incRes)
}

/** Incremental snapshot result for the phase 2. Is always {@code null} for the phase 1 or in case of normal snapshot. */
private @Nullable SnapshotChecker.IncrementalSnapshotResult incrementalResult() {
private @Nullable IncrementalSnapshotCheckResult incrementalResult() {
return incRes;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.DirectoryStream;
Expand Down Expand Up @@ -58,7 +57,6 @@
import org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
Expand Down Expand Up @@ -142,7 +140,7 @@ public class SnapshotChecker {
public SnapshotChecker(
GridKernalContext kctx,
Marshaller marshaller,
ExecutorService executor,
ExecutorService executorSrvc,
@Nullable ClassLoader marshallerClsLdr
) {
this.kctx = kctx;
Expand All @@ -152,7 +150,7 @@ public SnapshotChecker(

this.encryptionSpi = kctx.config().getEncryptionSpi() == null ? new NoopEncryptionSpi() : kctx.config().getEncryptionSpi();

this.executor = executor;
this.executor = executorSrvc;

this.log = kctx.log(getClass());
}
Expand Down Expand Up @@ -313,11 +311,7 @@ public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
}

/** Checks that all incremental snapshots are present, contain correct metafile and WAL segments. */
private void checkIncrementalSnapshotsExist(
SnapshotMetadata fullMeta,
File snpDir,
int incIdx
) {
private void checkIncrementalSnapshotsExist(SnapshotMetadata fullMeta, File snpDir, int incIdx) {
try {
// Incremental snapshot must contain ClusterSnapshotRecord.
long startSeg = fullMeta.snapshotRecordPointer().index();
Expand Down Expand Up @@ -393,7 +387,7 @@ private void checkWalSegments(IncrementalSnapshotMetadata meta, long startWalSeg
}

/** */
public CompletableFuture<IncrementalSnapshotResult> checkIncrementalSnapshot(
public CompletableFuture<IncrementalSnapshotCheckResult> checkIncrementalSnapshot(
String snpName,
@Nullable String snpPath,
int incIdx
Expand Down Expand Up @@ -592,7 +586,7 @@ else if (txRec.state() == TransactionState.ROLLED_BACK) {
", walSegments=" + procSegCnt.get() + ']');
}

return new IncrementalSnapshotResult(
return new IncrementalSnapshotCheckResult(
txHashRes,
partHashRes,
partiallyCommittedTxs,
Expand Down Expand Up @@ -622,7 +616,7 @@ private Map<Integer, StoredCacheData> readTxCachesData(File snpDir) throws Ignit

/** */
public IdleVerifyResultV2 reduceIncrementalResults(
Map<ClusterNode, IncrementalSnapshotResult> results,
Map<ClusterNode, IncrementalSnapshotCheckResult> results,
Map<ClusterNode, Exception> operationErrors
) {
if (!operationErrors.isEmpty())
Expand All @@ -635,7 +629,7 @@ public IdleVerifyResultV2 reduceIncrementalResults(
Map<ClusterNode, Exception> errors = new HashMap<>();

results.forEach((node, res) -> {
if (res.exceptions.isEmpty() && errors.isEmpty()) {
if (res.exceptions().isEmpty() && errors.isEmpty()) {
if (!F.isEmpty(res.partiallyCommittedTxs()))
partiallyCommittedTxs.put(node, res.partiallyCommittedTxs());

Expand Down Expand Up @@ -665,8 +659,8 @@ public IdleVerifyResultV2 reduceIncrementalResults(
}
}
}
else if (!res.exceptions.isEmpty())
errors.put(node, F.first(res.exceptions));
else if (!res.exceptions().isEmpty())
errors.put(node, F.first(res.exceptions()));
});

// Add all missed pairs to conflicts.
Expand Down Expand Up @@ -1283,65 +1277,6 @@ private SnapshotEncryptionKeyProvider(GridKernalContext ctx, Map<Integer, File>
}
}

/** */
public static class IncrementalSnapshotResult implements Serializable {
/** */
private static final long serialVersionUID = 0L;

/** Transaction hashes collection. */
private Map<Object, TransactionsHashRecord> txHashRes;

/**
* Partition hashes collection. Value is a hash of data entries {@link DataEntry} from WAL segments included
* into the incremental snapshot.
*/
private Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes;

/** Partially committed transactions' collection. */
private Collection<GridCacheVersion> partiallyCommittedTxs;

/** Occurred exceptions. */
private Collection<Exception> exceptions;

/** */
public IncrementalSnapshotResult() {
// No-op.
}

/** */
private IncrementalSnapshotResult(
Map<Object, TransactionsHashRecord> txHashRes,
Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes,
Collection<GridCacheVersion> partiallyCommittedTxs,
Collection<Exception> exceptions
) {
this.txHashRes = txHashRes;
this.partHashRes = partHashRes;
this.partiallyCommittedTxs = partiallyCommittedTxs;
this.exceptions = exceptions;
}

/** */
public Map<PartitionKeyV2, PartitionHashRecordV2> partHashRes() {
return partHashRes;
}

/** */
public Map<Object, TransactionsHashRecord> txHashRes() {
return txHashRes;
}

/** */
public Collection<GridCacheVersion> partiallyCommittedTxs() {
return partiallyCommittedTxs;
}

/** */
public Collection<Exception> exceptions() {
return exceptions;
}
}

/** Holder for calculated hashes. */
private static class HashHolder {
/** */
Expand Down

0 comments on commit b54ab0d

Please sign in to comment.