Skip to content

Commit

Permalink
IGNITE-20822 Filter by higherBound in AbstractWalRecordsIterator (#11034
Browse files Browse the repository at this point in the history
)
  • Loading branch information
timoninmaxim authored Nov 13, 2023
1 parent aba0aaa commit fd8757e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public abstract class AbstractWalRecordsIterator
/** Factory to provide I/O interfaces for read primitives with files. */
private final SegmentFileInputFactory segmentFileInputFactory;

/** Optional inclusive high bound. */
protected final @Nullable WALPointer highBound;

/** Position of last read valid record. */
private WALPointer lastRead;

Expand All @@ -106,6 +109,7 @@ public abstract class AbstractWalRecordsIterator
* @param serializerFactory Serializer of current version to read headers.
* @param ioFactory ioFactory for file IO access.
* @param initialReadBufferSize buffer for reading records size.
* @param highBound Optional inclusive high bound.
* @param segmentFileInputFactory Factory to provide I/O interfaces for read primitives with files.
*/
protected AbstractWalRecordsIterator(
Expand All @@ -114,12 +118,14 @@ protected AbstractWalRecordsIterator(
@NotNull final RecordSerializerFactory serializerFactory,
@NotNull final FileIOFactory ioFactory,
final int initialReadBufferSize,
@Nullable WALPointer highBound,
SegmentFileInputFactory segmentFileInputFactory) {
this.log = log;
this.sharedCtx = sharedCtx;
this.serializerFactory = serializerFactory;
this.ioFactory = ioFactory;
this.segmentFileInputFactory = segmentFileInputFactory;
this.highBound = highBound;

buf = new ByteBufferExpander(initialReadBufferSize, ByteOrder.nativeOrder());
}
Expand Down Expand Up @@ -269,6 +275,10 @@ protected IgniteBiTuple<WALPointer, WALRecord> advanceRecord(

WALPointer actualFilePtr = new WALPointer(hnd.idx(), (int)hnd.in().position(), 0);

// Fast stop condition, after high bound reached.
if (highBound != null && actualFilePtr.compareTo(highBound) > 0)
return null;

try {
WALRecord rec = hnd.ser().readRecord(hnd.in(), actualFilePtr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2972,9 +2972,6 @@ private static class RecordsIterator extends AbstractWalRecordsIterator {
/** Optional start pointer. */
@Nullable private final WALPointer start;

/** Optional end pointer. */
@Nullable private final WALPointer end;

/** Manager of segment location. */
private final SegmentRouter segmentRouter;

Expand Down Expand Up @@ -3018,14 +3015,14 @@ private RecordsIterator(
serializerFactory,
ioFactory,
dsCfg.getWalRecordIteratorBufferSize(),
end,
segmentFileInputFactory
);

this.walArchiveDir = walArchiveDir;
this.walWorkDir = walWorkDir;
this.archiver = archiver;
this.start = start;
this.end = end;
this.dsCfg = dsCfg;

this.decompressor = decompressor;
Expand Down Expand Up @@ -3112,7 +3109,7 @@ private void init() throws IgniteCheckedException {
curWalSegmIdx--;

if (log.isDebugEnabled())
log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']');
log.debug("Initialized WAL cursor [start=" + start + ", end=" + highBound + ", curWalSegmIdx=" + curWalSegmIdx + ']');

advance();
}
Expand All @@ -3125,7 +3122,7 @@ private void init() throws IgniteCheckedException {
curWalSegment.close();

// We are past the end marker.
if (end != null && curWalSegmIdx + 1 > end.index())
if (highBound != null && curWalSegmIdx + 1 > highBound.index())
return null; //stop iteration

curWalSegmIdx++;
Expand Down Expand Up @@ -3164,7 +3161,7 @@ private void init() throws IgniteCheckedException {
"Next segment file is not found [" +
"curWalSegmIdx=" + curWalSegmIdx
+ ", start=" + start
+ ", end=" + end
+ ", end=" + highBound
+ ", filePath=" + (fd == null ? "<empty>" : fd.file.getAbsolutePath())
+ ", walWorkDir=" + walWorkDir
+ ", walWorkDirContent=" + listFileNames(walWorkDir)
Expand Down Expand Up @@ -3210,7 +3207,7 @@ private static List<String> listFileNames(File dir) {
@Override protected IgniteCheckedException handleRecordException(Exception e, @Nullable WALPointer ptr) {
if (e instanceof IgniteCheckedException && X.hasCause(e, IgniteDataIntegrityViolationException.class)) {
// This means that there is no explicit last segment, so we iterate until the very end.
if (end == null) {
if (highBound == null) {
long nextWalSegmentIdx = curWalSegmIdx + 1;

if (archiver == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera
File archiveDir,
CIX1<WALRecord> advanceC
) throws IgniteCheckedException {
super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), ioFactory, bufSize, new SimpleSegmentFileInputFactory());
super(
log,
sharedCtx,
initLogicalRecordsSerializerFactory(sharedCtx),
ioFactory,
bufSize,
null,
new SimpleSegmentFileInputFactory());

curWalSegmIdx = archivedSegIdx;
this.archiveDir = archiveDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
/** Replay from bound include. */
private final WALPointer lowBound;

/** Replay to bound include */
private final WALPointer highBound;

/**
* Creates iterator in file-by-file iteration mode. Directory
*
Expand Down Expand Up @@ -128,14 +125,14 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
new RecordSerializerFactoryImpl(sharedCtx, readTypeFilter),
ioFactory,
initialReadBufferSize,
highBound,
FILE_INPUT_FACTORY
);

if (strictBoundsCheck)
strictCheck(walFiles, lowBound, highBound);

this.lowBound = lowBound;
this.highBound = highBound;

this.keepBinary = keepBinary;

Expand Down Expand Up @@ -288,17 +285,8 @@ private void init(List<FileDescriptor> walFiles) {
if (tup == null)
return tup;

if (!checkBounds(tup.get1())) {
if (curRec != null) {
WALPointer prevRecPtr = curRec.get1();

// Fast stop condition, after high bound reached.
if (prevRecPtr != null && prevRecPtr.compareTo(highBound) > 0)
return null;
}

if (!checkBounds(tup.get1()))
return new T2<>(tup.get1(), FilteredRecord.INSTANCE); // FilteredRecord for mark as filtered.
}

return tup;
}
Expand Down

0 comments on commit fd8757e

Please sign in to comment.