diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 926bce8fe391f..c020a30be294f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -97,6 +97,9 @@ public abstract class AbstractWalRecordsIterator /** Factory to provide I/O interfaces for read primitives with files. */ private final SegmentFileInputFactory segmentFileInputFactory; + /** Optional inclusive high bound. */ + protected final @Nullable WALPointer highBound; + /** Position of last read valid record. */ private WALPointer lastRead; @@ -106,6 +109,7 @@ public abstract class AbstractWalRecordsIterator * @param serializerFactory Serializer of current version to read headers. * @param ioFactory ioFactory for file IO access. * @param initialReadBufferSize buffer for reading records size. + * @param highBound Optional inclusive high bound. * @param segmentFileInputFactory Factory to provide I/O interfaces for read primitives with files. */ protected AbstractWalRecordsIterator( @@ -114,12 +118,14 @@ protected AbstractWalRecordsIterator( @NotNull final RecordSerializerFactory serializerFactory, @NotNull final FileIOFactory ioFactory, final int initialReadBufferSize, + @Nullable WALPointer highBound, SegmentFileInputFactory segmentFileInputFactory) { this.log = log; this.sharedCtx = sharedCtx; this.serializerFactory = serializerFactory; this.ioFactory = ioFactory; this.segmentFileInputFactory = segmentFileInputFactory; + this.highBound = highBound; buf = new ByteBufferExpander(initialReadBufferSize, ByteOrder.nativeOrder()); } @@ -269,6 +275,10 @@ protected IgniteBiTuple advanceRecord( WALPointer actualFilePtr = new WALPointer(hnd.idx(), (int)hnd.in().position(), 0); + // Fast stop condition, after high bound reached. + if (highBound != null && actualFilePtr.compareTo(highBound) > 0) + return null; + try { WALRecord rec = hnd.ser().readRecord(hnd.in(), actualFilePtr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 2a0eb000909a7..cc78a3c17a4e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -2972,9 +2972,6 @@ private static class RecordsIterator extends AbstractWalRecordsIterator { /** Optional start pointer. */ @Nullable private final WALPointer start; - /** Optional end pointer. */ - @Nullable private final WALPointer end; - /** Manager of segment location. */ private final SegmentRouter segmentRouter; @@ -3018,6 +3015,7 @@ private RecordsIterator( serializerFactory, ioFactory, dsCfg.getWalRecordIteratorBufferSize(), + end, segmentFileInputFactory ); @@ -3025,7 +3023,6 @@ private RecordsIterator( this.walWorkDir = walWorkDir; this.archiver = archiver; this.start = start; - this.end = end; this.dsCfg = dsCfg; this.decompressor = decompressor; @@ -3112,7 +3109,7 @@ private void init() throws IgniteCheckedException { curWalSegmIdx--; if (log.isDebugEnabled()) - log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']'); + log.debug("Initialized WAL cursor [start=" + start + ", end=" + highBound + ", curWalSegmIdx=" + curWalSegmIdx + ']'); advance(); } @@ -3125,7 +3122,7 @@ private void init() throws IgniteCheckedException { curWalSegment.close(); // We are past the end marker. - if (end != null && curWalSegmIdx + 1 > end.index()) + if (highBound != null && curWalSegmIdx + 1 > highBound.index()) return null; //stop iteration curWalSegmIdx++; @@ -3164,7 +3161,7 @@ private void init() throws IgniteCheckedException { "Next segment file is not found [" + "curWalSegmIdx=" + curWalSegmIdx + ", start=" + start - + ", end=" + end + + ", end=" + highBound + ", filePath=" + (fd == null ? "" : fd.file.getAbsolutePath()) + ", walWorkDir=" + walWorkDir + ", walWorkDirContent=" + listFileNames(walWorkDir) @@ -3210,7 +3207,7 @@ private static List listFileNames(File dir) { @Override protected IgniteCheckedException handleRecordException(Exception e, @Nullable WALPointer ptr) { if (e instanceof IgniteCheckedException && X.hasCause(e, IgniteDataIntegrityViolationException.class)) { // This means that there is no explicit last segment, so we iterate until the very end. - if (end == null) { + if (highBound == null) { long nextWalSegmentIdx = curWalSegmIdx + 1; if (archiver == null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java index fcb2d4341156e..d3df32369693d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java @@ -70,7 +70,14 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera File archiveDir, CIX1 advanceC ) throws IgniteCheckedException { - super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), ioFactory, bufSize, new SimpleSegmentFileInputFactory()); + super( + log, + sharedCtx, + initLogicalRecordsSerializerFactory(sharedCtx), + ioFactory, + bufSize, + null, + new SimpleSegmentFileInputFactory()); curWalSegmIdx = archivedSegIdx; this.archiveDir = archiveDir; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index df6ca772291a2..c7582b7cf146b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -96,9 +96,6 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { /** Replay from bound include. */ private final WALPointer lowBound; - /** Replay to bound include */ - private final WALPointer highBound; - /** * Creates iterator in file-by-file iteration mode. Directory * @@ -128,6 +125,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { new RecordSerializerFactoryImpl(sharedCtx, readTypeFilter), ioFactory, initialReadBufferSize, + highBound, FILE_INPUT_FACTORY ); @@ -135,7 +133,6 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { strictCheck(walFiles, lowBound, highBound); this.lowBound = lowBound; - this.highBound = highBound; this.keepBinary = keepBinary; @@ -288,17 +285,8 @@ private void init(List walFiles) { if (tup == null) return tup; - if (!checkBounds(tup.get1())) { - if (curRec != null) { - WALPointer prevRecPtr = curRec.get1(); - - // Fast stop condition, after high bound reached. - if (prevRecPtr != null && prevRecPtr.compareTo(highBound) > 0) - return null; - } - + if (!checkBounds(tup.get1())) return new T2<>(tup.get1(), FilteredRecord.INSTANCE); // FilteredRecord for mark as filtered. - } return tup; }