diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java index e505e58679e1a..6fb69a4982a77 100644 --- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java @@ -37,6 +37,8 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.junit.Test; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER; + /** * Saves data using previous version of ignite and then load this data using actual version */ @@ -124,11 +126,7 @@ private void doTestStartupWithOldVersion(String ver) throws Exception { NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree(); - File[] compressedSegments = ft.walArchive().listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.endsWith(".wal.zip"); - } - }); + File[] compressedSegments = ft.walArchive().listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER); final int actualCompressedWalSegments = compressedSegments == null ? 0 : compressedSegments.length; diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 2b3f8f5a0e6ba..00dd86dff5eca 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; @@ -250,10 +251,12 @@ private void checkDeleteLostSegmentLinks(List expBefore, List expAft /** */ private void checkLinks(IgniteEx srv, List expLinks) { - File[] links = srv.context().pdsFolderResolver().fileTree().walCdc().listFiles(WAL_SEGMENT_FILE_FILTER); + NodeFileTree ft = srv.context().pdsFolderResolver().fileTree(); + + File[] links = ft.walCdc().listFiles(WAL_SEGMENT_FILE_FILTER); assertEquals(expLinks.size(), links.length); - Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex) + Arrays.stream(links).map(File::toPath).map(ft::walSegmentIndex) .allMatch(expLinks::contains); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index d6c31b425380a..c2d572ca7c874 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -56,7 +56,6 @@ import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext; @@ -88,7 +87,6 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; -import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex; import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents; import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; @@ -489,9 +487,9 @@ public void consumeWalSegmentsUntilStopped() { // Need unseen WAL segments only. .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p)) .peek(seen::add) // Adds to seen. - .sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)) // Sort by segment index. + .sorted(Comparator.comparingLong(ft::walSegmentIndex)) // Sort by segment index. .peek(p -> { - long nextSgmnt = segmentIndex(p); + long nextSgmnt = ft.walSegmentIndex(p); if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) { throw new IgniteException("Found missed segments. Some events are missed. Exiting! " + @@ -561,7 +559,7 @@ private boolean consumeSegment(Path segment) { if (walState != null) builder.from(walState.get1()); - long segmentIdx = segmentIndex(segment); + long segmentIdx = ft.walSegmentIndex(segment); lastSegmentConsumptionTs.value(System.currentTimeMillis()); @@ -807,7 +805,7 @@ private void updateCaches() { * @return {@code True} if segment file was deleted, {@code false} otherwise. */ private boolean removeProcessedOnFailover(Path segment) { - long segmentIdx = segmentIndex(segment); + long segmentIdx = ft.walSegmentIndex(segment); if (segmentIdx > walState.get1().index()) { throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " + @@ -854,7 +852,7 @@ private void saveStateAndRemoveProcessed(T2 curState) throw Path processedSegment = rmvIter.next(); // Can't delete current segment, because state points to it. - if (segmentIndex(processedSegment) >= curState.get1().index()) + if (ft.walSegmentIndex(processedSegment) >= curState.get1().index()) continue; // WAL segment is a hard link to a segment file in a specifal Change Data Capture folder. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java index d5d9a631ffa9f..b30302885e379 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java @@ -32,7 +32,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.cdc.CdcFileLockHolder; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -94,24 +94,24 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool if (!CU.isCdcEnabled(ignite.configuration())) throw new IgniteException("CDC is not configured."); - File walCdcDir = ignite.context().pdsFolderResolver().fileTree().walCdc(); + NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree(); - CdcFileLockHolder lock = new CdcFileLockHolder(walCdcDir.getAbsolutePath(), "Delete lost segments job", log); + CdcFileLockHolder lock = new CdcFileLockHolder(ft.walCdc().getAbsolutePath(), "Delete lost segments job", log); try { lock.tryLock(1); - try (Stream cdcFiles = Files.list(walCdcDir.toPath())) { + try (Stream cdcFiles = Files.list(ft.walCdc().toPath())) { Set delete = new HashSet<>(); AtomicLong lastSgmnt = new AtomicLong(-1); cdcFiles .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) - .sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex) + .sorted(Comparator.comparingLong(ft::walSegmentIndex) .reversed()) // Sort by segment index. .forEach(path -> { - long idx = FileWriteAheadLogManager.segmentIndex(path); + long idx = ft.walSegmentIndex(path); if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { lastSgmnt.set(idx); @@ -139,7 +139,7 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']'); }); - Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); + Path stateDir = ft.walCdc().toPath().resolve(STATE_DIR); if (stateDir.toFile().exists()) { File walState = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); @@ -157,7 +157,7 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool catch (IgniteCheckedException e) { throw new RuntimeException("Failed to delete lost segment CDC links. " + "Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " + - "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']'); + "[dir=" + ft.walCdc().getAbsolutePath() + ", reason=" + e.getMessage() + ']'); } finally { U.closeQuiet(lock); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/wal/WalTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/wal/WalTask.java index 6f9e4297cc01f..933dc9c3a731e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/wal/WalTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/wal/WalTask.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.regex.Pattern; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -48,6 +47,9 @@ import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN; + /** * Performs WAL cleanup clusterwide. */ @@ -56,12 +58,6 @@ public class WalTask extends VisorMultiNodeTask run(@Nullable WalDeleteCommandArg arg) throws IgniteException { try { GridKernalContext cctx = ignite.context(); + ft = ignite.context().pdsFolderResolver().fileTree(); GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.cache().context().database(); FileWriteAheadLogManager wal = (FileWriteAheadLogManager)cctx.cache().context().wal(); @@ -194,10 +194,10 @@ Collection getUnusedWalSegments( sortWalFiles(walFiles); // Obtain index of last archived WAL segment, it will not be deleted. - long lastArchIdx = getIndex(walFiles[walFiles.length - 1]); + long lastArchIdx = ft.walSegmentIndex(walFiles[walFiles.length - 1].toPath()); for (File f : walFiles) { - long fileIdx = getIndex(f); + long fileIdx = ft.walSegmentIndex(f.toPath()); if (fileIdx < maxIdx && fileIdx < lastArchIdx) res.add(f.getAbsolutePath()); @@ -239,7 +239,7 @@ Collection deleteUnusedWalSegments( Collection res = new ArrayList<>(num); for (File walFile: walFiles) { - if (getIndex(walFile) < maxIdx && num > 0) + if (ft.walSegmentIndex(walFile.toPath()) < maxIdx && num > 0) res.add(walFile.getAbsolutePath()); else break; @@ -272,8 +272,6 @@ private int resolveMaxReservedIndex(FileWriteAheadLogManager wal, WALPointer low * @throws IgniteCheckedException if failed. */ private File getWalArchiveDir() throws IgniteCheckedException { - NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree(); - if (!ft.walArchive().exists()) throw new IgniteCheckedException("WAL archive directory does not exists" + ft.walArchive().getAbsolutePath()); @@ -288,19 +286,9 @@ private File getWalArchiveDir() throws IgniteCheckedException { private void sortWalFiles(File[] files) { Arrays.sort(files, new Comparator() { @Override public int compare(File o1, File o2) { - return Long.compare(getIndex(o1), getIndex(o2)); + return Long.compare(ft.walSegmentIndex(o1.toPath()), ft.walSegmentIndex(o2.toPath())); } }); } } - - /** - * Get index from WAL segment file. - * - * @param file WAL segment file. - * @return Index of WAL segment file. - */ - private static long getIndex(File file) { - return Long.parseLong(file.getName().substring(0, 16)); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index bb1cbd80fe420..df32fbcfe8cc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.pagemem.wal; -import java.io.File; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -235,12 +234,6 @@ public WALIterator replay( */ void startAutoReleaseSegments(); - /** - * @param idx Segment index. - * @return Compressed archive segment. - */ - @Nullable File compactedSegment(long idx); - /** * Blocks current thread while segment with the {@code idx} not compressed. * If segment compressed, already, returns immediately. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java index 46d3c9189a13c..2a1bb7546e19b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.filename; import java.io.File; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.function.Predicate; import org.apache.ignite.IgniteException; @@ -35,6 +36,8 @@ import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_CDC_PATH; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.DB_DEFAULT_FOLDER; /** @@ -131,10 +134,10 @@ * ... * │ │ └── node01-e57e62a9-2ccf-4e1b-a11e-d35d32c0fe5d ← walCdc (node 1) * │ └── node00-e57e62a9-2ccf-4e1b-a11e-c24c21b9ed4c ← wal (node 0) - * │ ├── 0000000000000000.wal - * │ ├── 0000000000000001.wal + * │ ├── 0000000000000000.wal ← wal segment (index = 0) + * │ ├── 0000000000000001.wal ← wal segment (index = 1) * ... - * │ └── 0000000000000009.wal + * │ └── 0000000000000009.wal ← wal segment (index = 9) * │ └── node01-e57e62a9-2ccf-4e1b-a11e-d35d32c0fe5d ← wal (node 1) * ... * ├── diagnostic @@ -157,15 +160,27 @@ public class NodeFileTree extends SharedFileTree { /** Checkpoint directory name. */ public static final String CHECKPOINT_DIR = "cp"; + /** File extension of WAL segment. */ + public static final String WAL_SEGMENT_FILE_EXT = ".wal"; + + /** File extension of temp WAL segment. */ + public static final String TMP_WAL_SEG_FILE_EXT = WAL_SEGMENT_FILE_EXT + TMP_SUFFIX; + + /** File extension of zipped WAL segment. */ + public static final String ZIP_WAL_SEG_FILE_EXT = WAL_SEGMENT_FILE_EXT + ZIP_SUFFIX; + + /** File extension of temp zipped WAL segment. */ + public static final String TMP_ZIP_WAL_SEG_FILE_EXT = ZIP_WAL_SEG_FILE_EXT + TMP_SUFFIX; + + /** Filter out all cache directories. */ + public static final Predicate CACHE_DIR_FILTER = dir -> cacheDir(dir) || cacheGroupDir(dir); + /** Prefix for {@link #cacheStorage(String)} directory in case of single cache. */ private static final String CACHE_DIR_PREFIX = "cache-"; /** Prefix for {@link #cacheStorage(String)} directory in case of cache group. */ private static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-"; - /** Filter out all cache directories. */ - public static final Predicate CACHE_DIR_FILTER = dir -> cacheDir(dir) || cacheGroupDir(dir); - /** Filter out all cache directories including {@link MetaStorage}. */ public static final Predicate CACHE_DIR_WITH_META_FILTER = dir -> CACHE_DIR_FILTER.test(dir) || @@ -308,6 +323,54 @@ public File binaryMeta() { return wal; } + /** + * @param idx Segment number. + * @return Segment file. + */ + public File walSegment(long idx) { + return new File(wal, U.fixedLengthNumberName(idx, WAL_SEGMENT_FILE_EXT)); + } + + /** + * @param idx Segment number. + * @return Archive Segment file. + */ + public File walArchiveSegment(long idx) { + return new File(walArchive, U.fixedLengthNumberName(idx, WAL_SEGMENT_FILE_EXT)); + } + + /** + * @param idx Segment number. + * @return Temp segment file. + */ + public File tempWalSegment(long idx) { + return new File(wal, U.fixedLengthNumberName(idx, TMP_WAL_SEG_FILE_EXT)); + } + + /** + * @param idx Segment number. + * @return Temp archive Segment file. + */ + public File tempWalArchiveSegment(long idx) { + return new File(walArchive, U.fixedLengthNumberName(idx, TMP_WAL_SEG_FILE_EXT)); + } + + /** + * @param idx Segment number. + * @return Zipped archive Segment file. + */ + public File zipWalArchiveSegment(long idx) { + return new File(walArchive, U.fixedLengthNumberName(idx, ZIP_WAL_SEG_FILE_EXT)); + } + + /** + * @param idx Segment number. + * @return Zipped archive Segment file. + */ + public File zipTempWalArchiveSegment(long idx) { + return new File(walArchive, U.fixedLengthNumberName(idx, TMP_ZIP_WAL_SEG_FILE_EXT)); + } + /** @return Path to the directory containing archive WAL segments. */ public @Nullable File walArchive() { return walArchive; @@ -448,6 +511,16 @@ else if (name.equals(MetaStorage.METASTORAGE_DIR_NAME)) throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + dir); } + /** + * @param segment WAL segment file. + * @return Segment index. + */ + public long walSegmentIndex(Path segment) { + String fn = segment.getFileName().toString(); + + return Long.parseLong(fn.substring(0, fn.indexOf('.'))); + } + /** * Resolves directory specified by the given arguments. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java index 0c7824af0eb57..d368957432b62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java @@ -196,7 +196,7 @@ private void copyWal(File incSnpWalDir, WALPointer highPtr) throws IgniteInterru throw new IgniteException("Failed to create snapshot WAL directory [idx=" + incSnpWalDir + ']'); for (; lowIdx <= highIdx; lowIdx++) { - File seg = cctx.wal().compactedSegment(lowIdx); + File seg = cctx.kernalContext().pdsFolderResolver().fileTree().zipWalArchiveSegment(lowIdx); if (!seg.exists()) throw new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']'); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java index 998d5130a7600..0cb0f3da1d45f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java @@ -38,7 +38,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -105,9 +105,11 @@ void process( UUID incSnpId = meta.requestId(); + NodeFileTree ft = cctx.kernalContext().pdsFolderResolver().fileTree(); + File lastSeg = Arrays.stream(segments) .map(File::toPath) - .max(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)) + .max(Comparator.comparingLong(ft::walSegmentIndex)) .orElseThrow(() -> new IgniteCheckedException("Last WAL segment wasn't found [snpName=" + snpName + ']')) .toFile(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java index 2f34b575bbfe6..22cc7b0098f3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java @@ -28,14 +28,12 @@ import org.jetbrains.annotations.Nullable; import static java.nio.file.StandardOpenOption.READ; +import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.WAL_SEGMENT_FILE_EXT; /** * WAL file descriptor. */ public class FileDescriptor implements Comparable, AbstractWalRecordsIterator.AbstractFileDescriptor { - /** file extension of WAL segment. */ - private static final String WAL_SEGMENT_FILE_EXT = ".wal"; - /** File represented by this class. */ protected final File file; @@ -67,16 +65,6 @@ public FileDescriptor(File file, @Nullable Long idx) { this.idx = idx == null ? U.fixedLengthFileNumber(fileName) : idx; } - /** - * Getting segment file name. - * - * @param idx Segment index. - * @return Segment file name. - */ - public static String fileName(long idx) { - return U.fixedLengthNumberName(idx, WAL_SEGMENT_FILE_EXT); - } - /** {@inheritDoc} */ @Override public int compareTo(FileDescriptor o) { return Long.compare(idx, o.idx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index dec6ee92bd34e..b2799c0c2ab97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -32,7 +32,6 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; -import java.nio.file.Path; import java.sql.Time; import java.util.ArrayList; import java.util.Arrays; @@ -155,8 +154,10 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; -import static org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName; +import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.TMP_WAL_SEG_FILE_EXT; +import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.TMP_ZIP_WAL_SEG_FILE_EXT; +import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.WAL_SEGMENT_FILE_EXT; +import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.ZIP_WAL_SEG_FILE_EXT; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition; @@ -177,10 +178,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final byte[] FILL_BUF = new byte[1024 * 1024]; /** Pattern for segment file names. */ - public static final Pattern WAL_NAME_PATTERN = U.fixedLengthNumberNamePattern(".wal"); + public static final Pattern WAL_NAME_PATTERN = U.fixedLengthNumberNamePattern(WAL_SEGMENT_FILE_EXT); /** Pattern for WAL temp files - these files will be cleared at startup. */ - public static final Pattern WAL_TEMP_NAME_PATTERN = U.fixedLengthNumberNamePattern(".wal.tmp"); + public static final Pattern WAL_TEMP_NAME_PATTERN = U.fixedLengthNumberNamePattern(TMP_WAL_SEG_FILE_EXT); /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ public static final FileFilter WAL_SEGMENT_FILE_FILTER = file -> !file.isDirectory() && @@ -191,7 +192,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches(); /** */ - public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(".wal.zip"); + public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(ZIP_WAL_SEG_FILE_EXT); /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = file -> !file.isDirectory() && @@ -199,14 +200,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches()); /** */ - private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(".wal.zip.tmp"); + private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(TMP_ZIP_WAL_SEG_FILE_EXT); /** */ - private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = file -> !file.isDirectory() && + public static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = file -> !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); /** */ - private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = file -> !file.isDirectory() && + public static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = file -> !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); /** Buffer size. */ @@ -666,8 +667,8 @@ public Collection getWalFilesFromArchive( List res = new ArrayList<>(); for (long i = low.index(); i < high.index(); i++) { - File file = archiveSegment(i, null); - File fileZip = compactedSegment(i); + File file = ft.walArchiveSegment(i); + File fileZip = ft.zipWalArchiveSegment(i); if (file.exists()) res.add(file); @@ -1107,8 +1108,8 @@ private FileWriteHandle closeBufAndRollover( * @return {@code True} exists. */ private boolean hasIndex(long absIdx) { - boolean inArchive = archiveSegment(absIdx, null).exists() || - compactedSegment(absIdx).exists(); + boolean inArchive = ft.walArchiveSegment(absIdx).exists() || + ft.zipWalArchiveSegment(absIdx).exists(); if (inArchive) return true; @@ -1253,7 +1254,7 @@ private long lastArchivedIndex() { for (File file : ft.walArchive().listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { try { - long idx = Long.parseLong(file.getName().substring(0, 16)); + long idx = ft.walSegmentIndex(file.toPath()); lastIdx = Math.max(lastIdx, idx); } @@ -1265,16 +1266,6 @@ private long lastArchivedIndex() { return lastIdx; } - /** - * @param segment WAL segment file. - * @return Segment index. - */ - public static long segmentIndex(Path segment) { - String fn = segment.getFileName().toString(); - - return Long.parseLong(fn.substring(0, fn.indexOf('.'))); - } - /** * @param file File to read. * @param ioFactory IO factory. @@ -1510,7 +1501,7 @@ else if (nextSegmentInited(lastReadPtr)) { private File segmentFile(long absIdx) { long segNo = archiver == null ? absIdx : absIdx % dsCfg.getWalSegments(); - return new File(ft.wal(), fileName(segNo)); + return ft.walSegment(segNo); } /** @return {@code True} if the given pointer is the last in a segment and a next segment has been initialized. */ @@ -1632,7 +1623,7 @@ private void prepareAndCheckWalFiles() throws StorageException { } if (F.isEmpty(ft.wal().listFiles(WAL_SEGMENT_FILE_FILTER))) - createFile(new File(ft.wal(), fileName(0))); + createFile(ft.walSegment(0)); if (isArchiverEnabled()) { moveSegmentsToArchive(); @@ -1728,7 +1719,7 @@ private File pollNextFile(long curIdx) throws StorageException, IgniteInterrupte segmentAware.curAbsWalIdx(curIdx + 1); segmentAware.setLastArchivedAbsoluteIndex(curIdx); - return new File(ft.wal(), fileName(curIdx + 1)); + return ft.walSegment(curIdx + 1); } long absNextIdxStartTime = System.nanoTime(); @@ -1753,7 +1744,7 @@ private File pollNextFile(long curIdx) throws StorageException, IgniteInterrupte long segmentIdx = absNextIdx % dsCfg.getWalSegments(); - return new File(ft.wal(), fileName(segmentIdx)); + return ft.walSegment(segmentIdx); } /** @@ -2112,10 +2103,10 @@ private long nextAbsoluteSegmentIndex() throws StorageException, IgniteInterrupt public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException { long segIdx = absIdx % dsCfg.getWalSegments(); - File origFile = new File(ft.wal(), fileName(segIdx)); + File origFile = ft.walSegment(segIdx); - File dstTmpFile = FileWriteAheadLogManager.this.archiveSegment(absIdx, TMP_SUFFIX); - File dstFile = FileWriteAheadLogManager.this.archiveSegment(absIdx, null); + File dstTmpFile = ft.tempWalArchiveSegment(absIdx); + File dstFile = ft.walArchiveSegment(absIdx); if (log.isInfoEnabled()) { log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx + @@ -2395,9 +2386,9 @@ private void body0() { if ((segIdx = tryReserveNextSegmentOrWait()) == -1) continue; - File tmpZip = archiveSegment(segIdx, ZIP_SUFFIX + TMP_SUFFIX); - File zip = compactedSegment(segIdx); - File raw = archiveSegment(segIdx, null); + File tmpZip = ft.zipTempWalArchiveSegment(segIdx); + File zip = ft.zipWalArchiveSegment(segIdx); + File raw = ft.walArchiveSegment(segIdx); long currSize = 0; long reservedSize = raw.length(); @@ -2474,7 +2465,7 @@ private void compressSegmentToFile(long idx, File raw, File zip) throws IOExcept try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) { zos.setLevel(dsCfg.getWalCompactionLevel()); - zos.putNextEntry(new ZipEntry(idx + ".wal")); + zos.putNextEntry(new ZipEntry(idx + WAL_SEGMENT_FILE_EXT)); ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE); buf.order(ByteOrder.nativeOrder()); @@ -2495,7 +2486,7 @@ private void compressSegmentToFile(long idx, File raw, File zip) throws IOExcept }; try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator( - log, cctx, ioFactory, BUF_SIZE, idx, ft.walArchive(), appendToZipC)) { + log, cctx, ioFactory, BUF_SIZE, idx, ft, appendToZipC)) { while (iter.hasNextX()) iter.nextX(); @@ -2561,36 +2552,11 @@ private void deleteObsoleteRawSegments() { } } - /** {@inheritDoc} */ - @Override public File compactedSegment(long idx) { - return archiveSegment(idx, ZIP_SUFFIX); - } - /** {@inheritDoc} */ @Override public void awaitCompacted(long idx) throws IgniteInterruptedCheckedException { segmentAware.awaitSegmentCompressed(idx); } - /** */ - private File archiveSegment(long idx, @Nullable String ext) { - return archiveSegment(ft.walArchive(), idx, ext); - } - - /** - * @param walArchiveDir WAL archive directory. - * @param idx Segment index. - * @param ext Optional extension - * @return Path to archive segment. - */ - public static File archiveSegment(File walArchiveDir, long idx, String ext) { - String fileName = fileName(idx); - - if (ext != null) - fileName += ext; - - return new File(walArchiveDir, fileName); - } - /** * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay. */ @@ -2635,9 +2601,9 @@ private class FileDecompressor extends GridWorker { if (segmentToDecompress == -1) continue; - File zip = compactedSegment(segmentToDecompress); - File unzipTmp = archiveSegment(segmentToDecompress, TMP_SUFFIX); - File unzip = archiveSegment(segmentToDecompress, null); + File zip = ft.zipWalArchiveSegment(segmentToDecompress); + File unzipTmp = ft.tempWalArchiveSegment(segmentToDecompress); + File unzip = ft.walArchiveSegment(segmentToDecompress); long currSize = 0; long reservedSize = U.uncompressedSize(zip); @@ -2714,7 +2680,7 @@ synchronized IgniteInternalFuture decompressFile(long idx) { if (decompressionFutures.containsKey(idx)) return decompressionFutures.get(idx); - File f = archiveSegment(idx, null); + File f = ft.walArchiveSegment(idx); if (f.exists()) return new GridFinishedFuture<>(); @@ -2767,7 +2733,7 @@ private void checkFiles( @Nullable IgniteInClosure completionCb ) throws StorageException { for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || p.apply(i)); i++) { - File checkFile = new File(ft.wal(), fileName(i)); + File checkFile = ft.walSegment(i); if (checkFile.exists()) { if (checkFile.isDirectory()) { @@ -2981,7 +2947,7 @@ private RecordsIterator( AbstractFileDescriptor currDesc = desc; if (!desc.file().exists()) { - FileDescriptor zipFile = new FileDescriptor(archiveSegment(ft.walArchive(), desc.idx(), ZIP_SUFFIX)); + FileDescriptor zipFile = new FileDescriptor(ft.zipWalArchiveSegment(desc.idx())); if (!zipFile.file.exists()) { throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " + @@ -3096,8 +3062,8 @@ private void init() throws IgniteCheckedException { else { // Log only when no segments were read. This will help us avoiding logging on the end of the WAL. if (curRec == null && curWalSegment == null) { - File workDirFile = new File(ft.wal(), fileName(curWalSegmIdx % dsCfg.getWalSegments())); - File archiveDirFile = new File(ft.walArchive(), fileName(curWalSegmIdx)); + File workDirFile = ft.walSegment(curWalSegmIdx % dsCfg.getWalSegments()); + File archiveDirFile = ft.walArchiveSegment(curWalSegmIdx); U.warn( log, @@ -3200,7 +3166,7 @@ private boolean canIgnoreCrcError( Exception e, @Nullable WALPointer ptr ) { - FileDescriptor fd = new FileDescriptor(new File(ft.wal(), fileName(workIdx)), walSegmentIdx); + FileDescriptor fd = new FileDescriptor(ft.walSegment(workIdx), walSegmentIdx); try { if (!fd.file().exists()) @@ -3532,16 +3498,16 @@ private void renameLastSegment() throws StorageException { if (workSegments.length == 1 && workSegments[0].idx() != workSegments[0].idx() % dsCfg.getWalSegments()) { FileDescriptor toRen = workSegments[0]; + long idx = toRen.idx() % dsCfg.getWalSegments(); + + File tmpDst = ft.tempWalSegment(idx); + File dst = ft.walSegment(idx); + if (log.isInfoEnabled()) { log.info("Last WAL segment file has to be renamed from " + toRen.file().getName() + " to " + - fileName(toRen.idx() % dsCfg.getWalSegments()) + '.'); + dst.getName() + '.'); } - String toRenFileName = fileName(toRen.idx() % dsCfg.getWalSegments()); - - File tmpDst = new File(ft.wal(), toRenFileName + TMP_SUFFIX); - File dst = new File(ft.wal(), toRenFileName); - try { Files.copy(toRen.file().toPath(), tmpDst.toPath()); @@ -3613,7 +3579,7 @@ private void formatWorkSegments() throws StorageException { // Batch output. if (log.isInfoEnabled() && (i == toFormat.size() - 1 || (i != 0 && i % 9 == 0))) { log.info("WAL segments formatted: " + toFormat.get(j).file().getName() + - (i == j ? "" : " - " + fileName(i))); + (i == j ? "" : " - " + ft.walSegment(i).getName())); j = i + 1; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java index eab0b5dea4bcf..f41903345e283 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java @@ -17,14 +17,11 @@ package org.apache.ignite.internal.processors.cache.persistence.wal; -import java.io.File; import java.io.FileNotFoundException; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; -import static org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName; - /** * Class for manage of segment file location. */ @@ -66,12 +63,12 @@ public FileDescriptor findSegment(long segmentId) throws FileNotFoundException { FileDescriptor fd; if (segmentAware.lastArchivedAbsoluteIndex() >= segmentId || !ft.walArchiveEnabled()) - fd = new FileDescriptor(new File(ft.walArchive(), fileName(segmentId))); + fd = new FileDescriptor(ft.walArchiveSegment(segmentId)); else - fd = new FileDescriptor(new File(ft.wal(), fileName(segmentId % dsCfg.getWalSegments())), segmentId); + fd = new FileDescriptor(ft.walSegment(segmentId % dsCfg.getWalSegments()), segmentId); if (!fd.file().exists()) { - FileDescriptor zipFile = new FileDescriptor(new File(ft.walArchive(), fileName(fd.idx()) + ZIP_SUFFIX)); + FileDescriptor zipFile = new FileDescriptor(ft.zipWalArchiveSegment(fd.idx())); if (!zipFile.file().exists()) { throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java index d3df32369693d..c4b2c69a2d1db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java @@ -16,7 +16,6 @@ */ package org.apache.ignite.internal.processors.cache.persistence.wal; -import java.io.File; import java.io.FileNotFoundException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -24,6 +23,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; @@ -47,10 +47,10 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera private boolean segmentInitialized; /** Archive directory. */ - private File archiveDir; + private final NodeFileTree ft; /** Closure which is executed right after advance. */ - private CIX1 advanceC; + private final CIX1 advanceC; /** * @param log Logger. @@ -58,7 +58,7 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera * @param ioFactory Io factory. * @param bufSize Buffer size. * @param archivedSegIdx Archived seg index. - * @param archiveDir Directory with segment. + * @param ft Node file tree. * @param advanceC Closure which is executed right after advance. */ SingleSegmentLogicalRecordsIterator( @@ -67,7 +67,7 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera @NotNull FileIOFactory ioFactory, int bufSize, long archivedSegIdx, - File archiveDir, + NodeFileTree ft, CIX1 advanceC ) throws IgniteCheckedException { super( @@ -80,7 +80,7 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera new SimpleSegmentFileInputFactory()); curWalSegmIdx = archivedSegIdx; - this.archiveDir = archiveDir; + this.ft = ft; this.advanceC = advanceC; advance(); @@ -106,8 +106,7 @@ private static RecordSerializerFactory initLogicalRecordsSerializerFactory(GridC else { segmentInitialized = true; - FileDescriptor fd = new FileDescriptor( - new File(archiveDir, FileDescriptor.fileName(curWalSegmIdx))); + FileDescriptor fd = new FileDescriptor(ft.walArchiveSegment(curWalSegmIdx)); try { return initReadHandle(fd, null); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java index c5d3250ad0712..33dc7ae5bf79e 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java @@ -43,7 +43,7 @@ import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord; import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext; import org.apache.ignite.internal.util.lang.RunnableX; @@ -349,9 +349,11 @@ private void checkCdcSegmentsExists(long from, long to) throws IgniteInterrupted assertTrue(waitForCondition(() -> { try { - List actual = Files.list(ign.context().pdsFolderResolver().fileTree().walCdc().toPath()) + NodeFileTree ft = ign.context().pdsFolderResolver().fileTree(); + + List actual = Files.list(ft.walCdc().toPath()) .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) - .map(FileWriteAheadLogManager::segmentIndex) + .map(ft::walSegmentIndex) .sorted() .collect(Collectors.toList()); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java index 324afc77258a2..e7fb3c283f367 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java @@ -42,7 +42,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; @@ -250,7 +250,7 @@ public void testArchiveCleared() throws Exception { long finishSgmnt = wal.currentSegment(); - String archive = archive(ignite); + NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree(); assertTrue(finishSgmnt > startSgmnt); assertTrue( @@ -258,7 +258,7 @@ public void testArchiveCleared() throws Exception { waitForCondition(() -> startSgmnt <= wal.lastArchivedSegment(), getTestTimeout()) ); - File startSgmntArchived = new File(archive, FileDescriptor.fileName(startSgmnt)); + File startSgmntArchived = ft.walArchiveSegment(startSgmnt); assertTrue("Check archived segment file exists", startSgmntArchived.exists()); @@ -346,7 +346,7 @@ private void doTestWal( private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException { WALIterator iter = new IgniteWalIteratorFactory(log).iterator(new IteratorParametersBuilder() .ioFactory(new RandomAccessFileIOFactory()) - .filesOrDirs(archive(ignite))); + .filesOrDirs(ignite.context().pdsFolderResolver().fileTree().walArchive().getAbsolutePath())); int walRecCnt = 0; @@ -372,13 +372,4 @@ private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException { return walRecCnt; } - - /** - * @param ignite Ignite. - * @return WAL archive patch - * @throws IgniteCheckedException If failed - */ - private static String archive(IgniteEx ignite) { - return ignite.context().pdsFolderResolver().fileTree().walArchive().getAbsolutePath(); - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java index 8949a69a86d4f..0e08ddd281fe6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.encryption; -import java.io.File; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; @@ -45,6 +44,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType; import org.apache.ignite.internal.util.distributed.InitMessage; @@ -761,14 +761,12 @@ public void testWalArchiveCleanup() throws Exception { assertEquals(2, node.context().encryption().groupKeyIds(grpId).size()); + NodeFileTree ft = node.context().pdsFolderResolver().fileTree(); + stopAllGrids(); // Cleanup WAL arcive folder. - File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false); - - boolean rmvd = U.delete(new File(dbDir, "wal/archive")); - - assertTrue(rmvd); + assertTrue(U.delete(ft.walArchive().getParentFile())); node = startGrid(GRID_0); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/pagemem/wal/record/WALRecordSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/pagemem/wal/record/WALRecordSerializationTest.java index b236135c6fc54..f2312fdbf7efc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/pagemem/wal/record/WALRecordSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/pagemem/wal/record/WALRecordSerializationTest.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -38,7 +38,6 @@ import org.mockito.internal.matchers.apachecommons.ReflectionEquals; import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; /** * Tests of serialization and deserialization of all WAL record types @@ -180,9 +179,9 @@ public void testAllWalRecordsSerializedCompressedAndThenDeserializedSuccessfully ignite.context().cache().context().database().checkpointReadUnlock(); } - File nodeArchiveDir = ignite.context().pdsFolderResolver().fileTree().walArchive(); - File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(lastPointer.index())); - File walZipSegment = new File(nodeArchiveDir, FileDescriptor.fileName(lastPointer.index()) + ZIP_SUFFIX); + NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree(); + File walSegment = ft.walArchiveSegment(lastPointer.index()); + File walZipSegment = ft.zipWalArchiveSegment(lastPointer.index()); // Spam WAL to move all data records to compressible WAL zone. for (int i = 0; i < WAL_SEGMENT_SIZE / DFLT_PAGE_SIZE * 2; i++) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java index 8b92363822d04..9cff28d988252 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java @@ -539,10 +539,10 @@ private void checkWalArchiveAndTotalSize(IgniteEx igniteEx, boolean hasWalArchiv assertTrue(waitForCondition(() -> walMgr.lastArchivedSegment() == walMgr.currentSegment() - 1, 3000l)); } - long totalSize = walMgr.totalSize(walFiles(ft.wal())); + long totalSize = FileWriteAheadLogManager.totalSize(walFiles(ft.wal())); if (ft.walArchiveEnabled()) - totalSize += walMgr.totalSize(walFiles(ft.walArchive())); + totalSize += FileWriteAheadLogManager.totalSize(walFiles(ft.walArchive())); assertEquals(totalSize, dsMetricRegistry(igniteEx).findMetric("WalTotalSize").value()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java index fdc555595203e..32aea80bf61b0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java @@ -66,6 +66,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; /** * @@ -414,7 +415,7 @@ public void testWalFsyncWriteHeaderFailure() throws Exception { failingFileIOFactory.createClosure((file, options) -> { FileIO delegate = failingFileIOFactory.delegateFactory().create(file, options); - if (file.getName().endsWith(".wal")) { + if (WAL_NAME_PATTERN.matcher(file.getName()).matches()) { return new FileIODecorator(delegate) { @Override public int write(ByteBuffer srcBuf) throws IOException { throw new IOException("No space left on device"); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WALPreloadingWithCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WALPreloadingWithCompactionTest.java index 9bcfaa6f6a65e..c043f6ba9295c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WALPreloadingWithCompactionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WALPreloadingWithCompactionTest.java @@ -29,7 +29,6 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -120,8 +119,8 @@ public void test() throws Exception { private void checkThatOnlyZipSegmentExists(IgniteEx ignite, int segment) { NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree(); - File walZipSegment = new File(ft.walArchive(), FileDescriptor.fileName(segment) + ".zip"); - File walRawSegment = new File(ft.walArchive(), FileDescriptor.fileName(segment)); + File walZipSegment = ft.zipWalArchiveSegment(segment); + File walRawSegment = ft.walArchiveSegment(segment); assertTrue(walZipSegment.exists()); assertFalse(walRawSegment.exists()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java index 101c8e44dc893..9083455d831ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java @@ -55,6 +55,7 @@ import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; +import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.WAL_SEGMENT_FILE_EXT; /** * Tests node recovering after disk errors during interaction with persistent storage. @@ -276,7 +277,10 @@ public void testRecoveringOnCheckpointWriteFail() throws Exception { @Test public void testRecoveringOnWALWritingFail1() throws Exception { // Allow to allocate only 1 wal segment, fail on write to second. - ioFactory = new FilteringFileIOFactory(".wal", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), WAL_SEGMENT_SIZE)); + ioFactory = new FilteringFileIOFactory( + WAL_SEGMENT_FILE_EXT, + new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), WAL_SEGMENT_SIZE) + ); System.setProperty(IGNITE_WAL_MMAP, "true"); @@ -290,7 +294,7 @@ public void testRecoveringOnWALWritingFail1() throws Exception { public void testRecoveringOnWALWritingFail2() throws Exception { // Fail somewhere on the second wal segment. ioFactory = new FilteringFileIOFactory( - ".wal", + WAL_SEGMENT_FILE_EXT, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), (long)(1.5 * WAL_SEGMENT_SIZE)) ); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java index 15d1292e75ce5..0dc086eb31e8a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; -import java.io.File; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -41,7 +40,7 @@ import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; -import static org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName; +import static org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.WAL_SEGMENT_FILE_EXT; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.isSegmentFileName; import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; @@ -133,11 +132,19 @@ public void testLocalSegmentSizesWithoutArchiveWithCompression() throws Exceptio */ @Test public void testSegmentFileName() throws Exception { - Arrays.asList(null, "", "1", "wal", fileName(0) + "1", fileName(1).replace(".wal", ".wa")) - .forEach(s -> assertFalse(s, isSegmentFileName(s))); + NodeFileTree ft = nodeFileTree("unknown"); + + Arrays.asList( + null, + "", + "1", + "wal", + ft.walSegment(0).getName() + "1", + ft.walSegment(1).getName().replace(WAL_SEGMENT_FILE_EXT, ".wa") + ).forEach(s -> assertFalse(s, isSegmentFileName(s))); IntStream.range(0, 10) - .mapToObj(FileDescriptor::fileName) + .mapToObj(idx -> ft.walSegment(idx).getName()) .forEach(fn -> assertTrue(fn, isSegmentFileName(fn) && isSegmentFileName(fn + ZIP_SUFFIX))); } @@ -224,7 +231,7 @@ private void checkLocalSegmentSizes(IgniteEx n) throws Exception { int segments = n.configuration().getDataStorageConfiguration().getWalSegments(); for (long i = absIdx - (absIdx % segments); i <= absIdx; i++) - expSegmentSize.putIfAbsent(i, new File(ft.wal(), fileName(i % segments)).length()); + expSegmentSize.putIfAbsent(i, ft.walSegment(i % segments).length()); } assertEquals(currHnd.getSegmentId() + 1, expSegmentSize.size()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java index 0e5c6de84e80b..bd5389518b497 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java @@ -51,6 +51,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; /** * Tests error recovery while node flushing @@ -274,7 +275,7 @@ private static class FailingFileIOFactory implements FileIOFactory { @Override public int write(ByteBuffer srcBuf) throws IOException { System.out.println(">>>!!!! W " + file.getName()); - if (fail != null && file.getName().endsWith(".wal") && fail.get()) + if (fail != null && WAL_NAME_PATTERN.matcher(file.getName()).matches() && fail.get()) throw new IOException("No space left on device"); return super.write(srcBuf); @@ -284,7 +285,7 @@ private static class FailingFileIOFactory implements FileIOFactory { @Override public MappedByteBuffer map(int sizeBytes) throws IOException { System.out.println(">>>!!!! M " + file.getName()); - if (fail != null && file.getName().endsWith(".wal") && fail.get()) + if (fail != null && WAL_NAME_PATTERN.matcher(file.getName()).matches() && fail.get()) throw new IOException("No space left on deive"); return delegate.map(sizeBytes); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java index b34919d697ec0..bc8941a496662 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; @@ -64,6 +65,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; +import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -258,7 +260,7 @@ private void checkInvariantSwitchSegment(int serVer) throws Exception { // If switchSegmentRecordSize more that 1, it mean that invariant is broke. // Filling tail some garbage. Simulate tail garbage on rotate segment in WAL work directory. if (switchSegmentRecordSize > 1) { - File seg = new File(workDir + ARCHIVE_SUB_DIR + "/0000000000000000.wal"); + File seg = GridTestUtils.getFieldValue(walMgr, "ft").walArchiveSegment(0); FileIOFactory ioFactory = new RandomAccessFileIOFactory(); @@ -383,9 +385,11 @@ private void checkSwitchReadingSegmentDuringIteration(int serVer) throws Excepti fut.get(); + NodeFileTree ft = getFieldValue(walMgr, "ft"); + //should started iteration from work directory but finish from archive directory. - assertEquals(workDir + WORK_SUB_DIR + File.separator + "0000000000000000.wal", startedSegmentPath.get()); - assertEquals(workDir + ARCHIVE_SUB_DIR + File.separator + "0000000000000000.wal", finishedSegmentPath.get()); + assertEquals(ft.walSegment(0).getAbsolutePath(), startedSegmentPath.get()); + assertEquals(ft.walArchiveSegment(0).getAbsolutePath(), finishedSegmentPath.get()); Assert.assertEquals("Not all records read during iteration.", recordsToWrite, actualRecords.get()); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java index 1e2c075f2fa11..de15047a705ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java @@ -105,6 +105,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; import static org.apache.ignite.cluster.ClusterState.ACTIVE; import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; /** * Historical WAL rebalance base test. @@ -1509,7 +1510,7 @@ static class FailingIOFactory implements FileIOFactory { @Override public FileIO create(File file, OpenOption... modes) throws IOException { FileIO delegateIO = delegate.create(file, modes); - if (file.getName().endsWith(".wal") && failRead) + if (WAL_NAME_PATTERN.matcher(file.getName()).matches() && failRead) return new FileIODecorator(delegateIO) { @Override public int read(ByteBuffer destBuf) throws IOException { throw new IOException("Test exception."); // IO exception is required for correct cleanup. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionSwitchOnTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionSwitchOnTest.java index f24d75bc6ed5f..48f42360d4b2b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionSwitchOnTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionSwitchOnTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; -import java.io.FileFilter; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterState; @@ -27,13 +26,15 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER; + /** * Load without compaction -> Stop -> Enable WAL Compaction -> Start. */ @@ -83,21 +84,13 @@ public void testWalCompactionSwitch() throws Exception { for (int i = 0; i < 500; i++) cache.put(i, i); - File walDir = U.resolveWorkDirectory( - ex.configuration().getWorkDirectory(), - "db/wal/node00-" + ex.localNode().consistentId(), - false - ); + File walDir = ex.context().pdsFolderResolver().fileTree().wal(); forceCheckpoint(); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - File[] archivedFiles = walDir.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(".wal"); - } - }); + File[] archivedFiles = walDir.listFiles(WAL_SEGMENT_FILE_FILTER); return archivedFiles.length == 39; } @@ -111,29 +104,17 @@ public void testWalCompactionSwitch() throws Exception { ex.cluster().state(ClusterState.ACTIVE); - File archiveDir = U.resolveWorkDirectory( - ex.configuration().getWorkDirectory(), - "db/wal/archive/node00-" + ex.localNode().consistentId(), - false - ); + File archiveDir = ex.context().pdsFolderResolver().fileTree().walArchive(); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - File[] archivedFiles = archiveDir.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(FilePageStoreManager.ZIP_SUFFIX); - } - }); + File[] archivedFiles = archiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER); return archivedFiles.length == 20; } }, 5000); - File[] tmpFiles = archiveDir.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().endsWith(FilePageStoreManager.TMP_SUFFIX); - } - }); + File[] tmpFiles = archiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER); assertEquals(0, tmpFiles.length); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java index e9da8b75e4f5c..e52cd9de4a9dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; -import java.io.FilenameFilter; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.Arrays; @@ -44,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; @@ -52,7 +50,7 @@ import org.junit.Test; import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER; /** * @@ -192,7 +190,7 @@ private void testApplyingUpdatesFromCompactedWal(boolean switchOffCompressor) th NodeFileTree ft = ig.context().pdsFolderResolver().fileTree(); - File walSegment = new File(ft.walArchive(), FileDescriptor.fileName(0) + ZIP_SUFFIX); + File walSegment = ft.zipWalArchiveSegment(0); // Allow compressor to compress WAL segments. assertTrue(GridTestUtils.waitForCondition(walSegment::exists, 15_000)); @@ -337,7 +335,7 @@ public void testOptimizedWalSegments() throws Exception { stopAllGrids(); - File walSegment = new File(ft.walArchive(), FileDescriptor.fileName(0)); + File walSegment = ft.walArchiveSegment(0); assertTrue("" + walSegment.length(), walSegment.length() < 200_000_000); } @@ -372,8 +370,8 @@ private void testCompressorToleratesEmptyWalSegments(WALMode walMode) throws Exc int emptyIdx = 5; - File walSegment = new File(ft.walArchive(), FileDescriptor.fileName(emptyIdx)); - File zippedWalSegment = new File(ft.walArchive(), FileDescriptor.fileName(emptyIdx + 1) + ZIP_SUFFIX); + File walSegment = ft.walArchiveSegment(emptyIdx); + File zippedWalSegment = ft.zipWalArchiveSegment(emptyIdx + 1); long start = U.currentTimeMillis(); do { @@ -399,18 +397,11 @@ private void testCompressorToleratesEmptyWalSegments(WALMode walMode) throws Exc // Allow compressor to compress WAL segments. assertTrue(GridTestUtils.waitForCondition(zippedWalSegment::exists, 15_000)); - File[] compressedSegments = ft.walArchive().listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.endsWith(".wal.zip"); - } - }); + File[] compressedSegments = ft.walArchive().listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER); long maxIdx = -1; - for (File f : compressedSegments) { - String idxPart = f.getName().substring(0, f.getName().length() - ".wal.zip".length()); - - maxIdx = Math.max(maxIdx, Long.parseLong(idxPart)); - } + for (File f : compressedSegments) + maxIdx = Math.max(maxIdx, ft.walSegmentIndex(f.toPath())); System.out.println("Max compressed index: " + maxIdx); assertTrue(maxIdx > emptyIdx); @@ -484,8 +475,8 @@ public void testSeekingStartInCompactedSegment() throws Exception { ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); - File unzippedWalSegment = new File(ft.walArchive(), FileDescriptor.fileName(0)); - File walSegment = new File(ft.walArchive(), FileDescriptor.fileName(0) + ZIP_SUFFIX); + File unzippedWalSegment = ft.walArchiveSegment(0); + File walSegment = ft.zipWalArchiveSegment(0); // Allow compressor to compress WAL segments. assertTrue(GridTestUtils.waitForCondition(() -> !unzippedWalSegment.exists(), 15_000)); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 104ad12800028..89ce9010518f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; -import java.io.File; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; @@ -203,11 +202,6 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { // No-op. } - /** {@inheritDoc} */ - @Override public @Nullable File compactedSegment(long idx) { - return null; - } - /** {@inheritDoc} */ @Override public void awaitCompacted(long idx) { // No-op. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java index 415fc90589020..bed27b3b12410 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java @@ -240,7 +240,7 @@ public void testFailIfSegmentNotFound() throws Exception { long segIdx = wal.lastCompactedSegment(); - U.delete(wal.compactedSegment(segIdx)); + U.delete(srv.context().pdsFolderResolver().fileTree().zipWalArchiveSegment(segIdx)); assertThrowsWithCause( () -> srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT), diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java index 9ba95490aebf8..47618fa7eed35 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotRestoreTest.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.WalTestUtils; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotVerifyException; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotMetadata; @@ -367,7 +368,9 @@ public void testRecoveryOnClusterSnapshotIfNoWalsOnSingleNode() throws Exception restartWithCleanPersistence(); - File rm = new File(incrementalSnapshotWalDir(grid(1), SNP, 1), "0000000000000000.wal.zip"); + NodeFileTree ft = grid(1).context().pdsFolderResolver().fileTree(); + + File rm = new File(incrementalSnapshotWalDir(grid(1), SNP, 1), ft.zipWalArchiveSegment(0).getName()); assertTrue(U.delete(rm)); diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java index ab13835ef9888..444e49ec7abc5 100644 --- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java +++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterTest.java @@ -295,7 +295,7 @@ public void testIgniteWalConverterWithBrokenWal() throws Exception { final NodeFileTree ft = createWal(list, null); - final File wal = new File(ft.wal(), "0000000000000000.wal"); + final File wal = ft.walSegment(0); try (RandomAccessFile raf = new RandomAccessFile(wal, "rw")) { raf.seek(RecordV1Serializer.HEADER_RECORD_SIZE); // HeaderRecord @@ -412,7 +412,7 @@ public void testIgniteWalConverterWithUnreadableWal() throws Exception { final NodeFileTree ft = createWal(list, null); - final File wal = new File(ft.wal(), "0000000000000000.wal"); + final File wal = ft.walSegment(0); try (RandomAccessFile raf = new RandomAccessFile(wal, "rw")) { raf.seek(RecordV1Serializer.HEADER_RECORD_SIZE); // HeaderRecord