Skip to content

Commit

Permalink
IGNITE-24425 WAL files move to NodeFileTree (#11865)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Feb 12, 2025
1 parent 8346282 commit 29f24fc
Show file tree
Hide file tree
Showing 31 changed files with 250 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.junit.Test;

import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER;

/**
* Saves data using previous version of ignite and then load this data using actual version
*/
Expand Down Expand Up @@ -124,11 +126,7 @@ private void doTestStartupWithOldVersion(String ver) throws Exception {

NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree();

File[] compressedSegments = ft.walArchive().listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.endsWith(".wal.zip");
}
});
File[] compressedSegments = ft.walArchive().listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER);

final int actualCompressedWalSegments = compressedSegments == null ? 0 : compressedSegments.length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.internal.pagemem.wal.record.CdcDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
Expand Down Expand Up @@ -250,10 +251,12 @@ private void checkDeleteLostSegmentLinks(List<Long> expBefore, List<Long> expAft

/** */
private void checkLinks(IgniteEx srv, List<Long> expLinks) {
File[] links = srv.context().pdsFolderResolver().fileTree().walCdc().listFiles(WAL_SEGMENT_FILE_FILTER);
NodeFileTree ft = srv.context().pdsFolderResolver().fileTree();

File[] links = ft.walCdc().listFiles(WAL_SEGMENT_FILE_FILTER);

assertEquals(expLinks.size(), links.length);
Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex)
Arrays.stream(links).map(File::toPath).map(ft::walSegmentIndex)
.allMatch(expLinks::contains);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
Expand Down Expand Up @@ -88,7 +87,6 @@
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.closeAllComponents;
import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.startAllComponents;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
Expand Down Expand Up @@ -489,9 +487,9 @@ public void consumeWalSegmentsUntilStopped() {
// Need unseen WAL segments only.
.filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
.peek(seen::add) // Adds to seen.
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)) // Sort by segment index.
.sorted(Comparator.comparingLong(ft::walSegmentIndex)) // Sort by segment index.
.peek(p -> {
long nextSgmnt = segmentIndex(p);
long nextSgmnt = ft.walSegmentIndex(p);

if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) {
throw new IgniteException("Found missed segments. Some events are missed. Exiting! " +
Expand Down Expand Up @@ -561,7 +559,7 @@ private boolean consumeSegment(Path segment) {
if (walState != null)
builder.from(walState.get1());

long segmentIdx = segmentIndex(segment);
long segmentIdx = ft.walSegmentIndex(segment);

lastSegmentConsumptionTs.value(System.currentTimeMillis());

Expand Down Expand Up @@ -807,7 +805,7 @@ private void updateCaches() {
* @return {@code True} if segment file was deleted, {@code false} otherwise.
*/
private boolean removeProcessedOnFailover(Path segment) {
long segmentIdx = segmentIndex(segment);
long segmentIdx = ft.walSegmentIndex(segment);

if (segmentIdx > walState.get1().index()) {
throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
Expand Down Expand Up @@ -854,7 +852,7 @@ private void saveStateAndRemoveProcessed(T2<WALPointer, Integer> curState) throw
Path processedSegment = rmvIter.next();

// Can't delete current segment, because state points to it.
if (segmentIndex(processedSegment) >= curState.get1().index())
if (ft.walSegmentIndex(processedSegment) >= curState.get1().index())
continue;

// WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.cdc.CdcFileLockHolder;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -94,24 +94,24 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
if (!CU.isCdcEnabled(ignite.configuration()))
throw new IgniteException("CDC is not configured.");

File walCdcDir = ignite.context().pdsFolderResolver().fileTree().walCdc();
NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree();

CdcFileLockHolder lock = new CdcFileLockHolder(walCdcDir.getAbsolutePath(), "Delete lost segments job", log);
CdcFileLockHolder lock = new CdcFileLockHolder(ft.walCdc().getAbsolutePath(), "Delete lost segments job", log);

try {
lock.tryLock(1);

try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) {
try (Stream<Path> cdcFiles = Files.list(ft.walCdc().toPath())) {
Set<File> delete = new HashSet<>();

AtomicLong lastSgmnt = new AtomicLong(-1);

cdcFiles
.filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)
.sorted(Comparator.comparingLong(ft::walSegmentIndex)
.reversed()) // Sort by segment index.
.forEach(path -> {
long idx = FileWriteAheadLogManager.segmentIndex(path);
long idx = ft.walSegmentIndex(path);

if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) {
lastSgmnt.set(idx);
Expand Down Expand Up @@ -139,7 +139,7 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']');
});

Path stateDir = walCdcDir.toPath().resolve(STATE_DIR);
Path stateDir = ft.walCdc().toPath().resolve(STATE_DIR);

if (stateDir.toFile().exists()) {
File walState = stateDir.resolve(WAL_STATE_FILE_NAME).toFile();
Expand All @@ -157,7 +157,7 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
catch (IgniteCheckedException e) {
throw new RuntimeException("Failed to delete lost segment CDC links. " +
"Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " +
"[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']');
"[dir=" + ft.walCdc().getAbsolutePath() + ", reason=" + e.getMessage() + ']');
}
finally {
U.closeQuiet(lock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand All @@ -48,6 +47,9 @@
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN;

/**
* Performs WAL cleanup clusterwide.
*/
Expand All @@ -56,12 +58,6 @@ public class WalTask extends VisorMultiNodeTask<WalDeleteCommandArg, WalTaskResu
/** */
private static final long serialVersionUID = 0L;

/** Pattern for segment file names. */
private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");

/** Pattern for compacted segment file names. */
private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");

/** WAL archive file filter. */
private static final FileFilter WAL_ARCHIVE_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
Expand Down Expand Up @@ -137,6 +133,9 @@ private static class WalJob extends VisorJob<WalDeleteCommandArg, Collection<Str
@LoggerResource
private transient IgniteLogger log;

/** Node file tree. */
private transient NodeFileTree ft;

/**
* @param arg WAL task argument.
* @param debug Debug flag.
Expand All @@ -149,6 +148,7 @@ public WalJob(WalDeleteCommandArg arg, boolean debug) {
@Nullable @Override protected Collection<String> run(@Nullable WalDeleteCommandArg arg) throws IgniteException {
try {
GridKernalContext cctx = ignite.context();
ft = ignite.context().pdsFolderResolver().fileTree();

GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.cache().context().database();
FileWriteAheadLogManager wal = (FileWriteAheadLogManager)cctx.cache().context().wal();
Expand Down Expand Up @@ -194,10 +194,10 @@ Collection<String> getUnusedWalSegments(
sortWalFiles(walFiles);

// Obtain index of last archived WAL segment, it will not be deleted.
long lastArchIdx = getIndex(walFiles[walFiles.length - 1]);
long lastArchIdx = ft.walSegmentIndex(walFiles[walFiles.length - 1].toPath());

for (File f : walFiles) {
long fileIdx = getIndex(f);
long fileIdx = ft.walSegmentIndex(f.toPath());

if (fileIdx < maxIdx && fileIdx < lastArchIdx)
res.add(f.getAbsolutePath());
Expand Down Expand Up @@ -239,7 +239,7 @@ Collection<String> deleteUnusedWalSegments(
Collection<String> res = new ArrayList<>(num);

for (File walFile: walFiles) {
if (getIndex(walFile) < maxIdx && num > 0)
if (ft.walSegmentIndex(walFile.toPath()) < maxIdx && num > 0)
res.add(walFile.getAbsolutePath());
else
break;
Expand Down Expand Up @@ -272,8 +272,6 @@ private int resolveMaxReservedIndex(FileWriteAheadLogManager wal, WALPointer low
* @throws IgniteCheckedException if failed.
*/
private File getWalArchiveDir() throws IgniteCheckedException {
NodeFileTree ft = ignite.context().pdsFolderResolver().fileTree();

if (!ft.walArchive().exists())
throw new IgniteCheckedException("WAL archive directory does not exists" + ft.walArchive().getAbsolutePath());

Expand All @@ -288,19 +286,9 @@ private File getWalArchiveDir() throws IgniteCheckedException {
private void sortWalFiles(File[] files) {
Arrays.sort(files, new Comparator<File>() {
@Override public int compare(File o1, File o2) {
return Long.compare(getIndex(o1), getIndex(o2));
return Long.compare(ft.walSegmentIndex(o1.toPath()), ft.walSegmentIndex(o2.toPath()));
}
});
}
}

/**
* Get index from WAL segment file.
*
* @param file WAL segment file.
* @return Index of WAL segment file.
*/
private static long getIndex(File file) {
return Long.parseLong(file.getName().substring(0, 16));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.pagemem.wal;

import java.io.File;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DataStorageConfiguration;
Expand Down Expand Up @@ -235,12 +234,6 @@ public WALIterator replay(
*/
void startAutoReleaseSegments();

/**
* @param idx Segment index.
* @return Compressed archive segment.
*/
@Nullable File compactedSegment(long idx);

/**
* Blocks current thread while segment with the {@code idx} not compressed.
* If segment compressed, already, returns immediately.
Expand Down
Loading

0 comments on commit 29f24fc

Please sign in to comment.