Skip to content

Commit

Permalink
IGNITE-20097 Fixed WAL logging to an archived segment after node rest…
Browse files Browse the repository at this point in the history
…art (#10887)
  • Loading branch information
NSAmelchev authored Aug 31, 2023
1 parent d43deea commit dbf1c78
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1455,16 +1455,25 @@ private FileWriteHandle rollOver(FileWriteHandle cur, @Nullable WALRecord rec) t
* @throws StorageException If failed to initialize WAL write handle.
*/
private FileWriteHandle restoreWriteHandle(@Nullable WALPointer lastReadPtr) throws StorageException {
long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();

@Nullable FileArchiver archiver0 = archiver;

long segNo = archiver0 == null ? absIdx : absIdx % dsCfg.getWalSegments();
long absIdx;
int off;

File curFile = new File(walWorkDir, fileName(segNo));
if (lastReadPtr == null) {
absIdx = 0;
off = 0;
}
else if (nextSegmentInited(lastReadPtr)) {
absIdx = lastReadPtr.index() + 1;
off = HEADER_RECORD_SIZE;
}
else {
absIdx = lastReadPtr.index();
off = lastReadPtr.fileOffset() + lastReadPtr.length();
}

int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
int len = lastReadPtr == null ? 0 : lastReadPtr.length();
File curFile = segmentFile(absIdx);

try {
SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile));
Expand Down Expand Up @@ -1494,7 +1503,7 @@ private FileWriteHandle restoreWriteHandle(@Nullable WALPointer lastReadPtr) thr
", offset=" + off + ", ver=" + serVer + ']');
}

FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off + len, ser);
FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off, ser);

segmentAware.curAbsWalIdx(absIdx);

Expand Down Expand Up @@ -1545,6 +1554,36 @@ private FileWriteHandle restoreWriteHandle(@Nullable WALPointer lastReadPtr) thr
}
}

/** */
private File segmentFile(long absIdx) {
long segNo = archiver == null ? absIdx : absIdx % dsCfg.getWalSegments();

return new File(walWorkDir, fileName(segNo));
}

/** @return {@code True} if the given pointer is the last in a segment and a next segment has been initialized. */
private boolean nextSegmentInited(WALPointer ptr) {
try {
try (WALIterator iter = replay(new WALPointer(ptr.index(), ptr.fileOffset() + ptr.length(), 0))) {
if (iter.hasNext())
return false;
}

long nextIdx = ptr.index() + 1;

try (SegmentIO fileIO = new SegmentIO(nextIdx, ioFactory.create(segmentFile(nextIdx), READ))) {
readSegmentHeader(fileIO, segmentFileInputFactory);
}

return true;
}
catch (Exception ignored) {
// No-op.
}

return false;
}

/**
* Fills the file header for a new segment. Calling this method signals we are done with the segment and it can be
* archived. If we don't have prepared file yet and achiever is busy this method blocks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class RestartWithWalForceArchiveTimeoutTest extends GridCommonAbstractTes
@Parameterized.Parameter
public WALMode walMode;

/** */
private long walForceArchiveTimeout;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
Expand All @@ -49,7 +52,7 @@ public class RestartWithWalForceArchiveTimeoutTest extends GridCommonAbstractTes

cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalMode(walMode)
.setWalForceArchiveTimeout(60 * 60 * 1000) // 1 hour to make sure auto archive will not work.
.setWalForceArchiveTimeout(walForceArchiveTimeout)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));

return cfg;
Expand All @@ -61,12 +64,17 @@ public static Collection<?> parameters() {
return EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, WALMode.BACKGROUND);
}

/** */
@Test
public void testRestart() throws Exception {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids(true);

cleanPersistenceDir();
}

/** */
@Test
public void testRestart() throws Exception {
walForceArchiveTimeout = 60 * 60 * 1000; // 1 hour to make sure auto archive will not work.

Supplier<IgniteEx> restart = () -> {
stopAllGrids(true);
Expand All @@ -92,4 +100,34 @@ public void testRestart() throws Exception {
for (int i = 0; i < 5; i++)
restart.get();
}

/** */
@Test
public void testRestartAfterArchive() throws Exception {
walForceArchiveTimeout = 1000;

IgniteEx srv = startGrid(0);

srv.cluster().state(ACTIVE);

IgniteCache<Integer, Integer> cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME);

cache.put(1, 1);

forceCheckpoint();

Thread.sleep(2 * walForceArchiveTimeout);

stopGrid(0);
srv = startGrid(0);
cache = srv.cache(DEFAULT_CACHE_NAME);

cache.put(2, 2);

stopGrid(0);
srv = startGrid(0);
cache = srv.cache(DEFAULT_CACHE_NAME);

assertEquals(2, cache.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cdc;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;

/**
* This tests check that the following scenario will works correctly.
*/
@RunWith(Parameterized.class)
public class WalRolloverOnStopTest extends GridCommonAbstractTest {
/** WAL mode. */
@Parameterized.Parameter
public WALMode walMode;

/** @return Test parameters. */
@Parameterized.Parameters(name = "walMode={0}")
public static Collection<?> parameters() {
return Arrays.asList(new Object[][] {{WALMode.BACKGROUND}, {WALMode.LOG_ONLY}, {WALMode.FSYNC}});
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalAutoArchiveAfterInactivity(1500L)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)));
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids();

cleanPersistenceDir();
}

/**
* Test scenario:
*
* 0. {@link DataStorageConfiguration#getWalAutoArchiveAfterInactivity()} > 0.
* 1. Node is gracefully stopping using {@link G#stop(String, boolean)}.
* 2. T0: {@code Checkpointer#doCheckpoint()} execute last checkpoint on stop and freeze.
* 3. T1: Rollover segment after inactivity timeout.
* 4. T2: Archive segment.
*
* After restart WAL should log in the next segment.
* */
@Test
public void testWallRollover() throws Exception {
AtomicLong curIdx = new AtomicLong();

for (int i = 0; i < 2; i++) {
IgniteEx ign = startGrid(0);

GridCacheDatabaseSharedManager db =
(GridCacheDatabaseSharedManager)ign.context().cache().context().database();

SegmentAware aware = GridTestUtils.getFieldValue(ign.context().cache().context().wal(), "segmentAware");

ign.cluster().state(ClusterState.ACTIVE);

IgniteCache<Integer, Integer> cache = ign.getOrCreateCache("my-cache");

CountDownLatch waitAfterCp = new CountDownLatch(1);
AtomicLong cntr = new AtomicLong(0);

db.addCheckpointListener(new CheckpointListener() {
@Override public void afterCheckpointEnd(Context ctx) {
if (!ign.context().isStopping())
return;

try {
waitAfterCp.await(getTestTimeout(), TimeUnit.MILLISECONDS);

cntr.incrementAndGet();
}
catch (InterruptedException e) {
throw new IgniteException(e);
}
}

@Override public void onMarkCheckpointBegin(Context ctx) {
// No-op.
}

@Override public void onCheckpointBegin(Context ctx) {
// No-op.
}

@Override public void beforeCheckpointBegin(Context ctx) {
// No-op.
}
});

int maxKey = (i + 1) * 3;

for (int j = i * 3; j < maxKey; j++)
cache.put(j, j);

curIdx.set(aware.curAbsWalIdx());

IgniteInternalFuture<?> fut = runAsync(() -> {
try {
aware.awaitSegmentArchived(curIdx.get());

cntr.incrementAndGet();
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
finally {
waitAfterCp.countDown();
}
});

G.stop(ign.name(), false);

fut.get(getTestTimeout());

// Checkpoint will happens two time because of segment archivation.
assertEquals("Should successfully wait for current segment archivation", 3, cntr.get());

IgniteWalIteratorFactory.IteratorParametersBuilder builder =
new IgniteWalIteratorFactory.IteratorParametersBuilder()
.log(ign.log())
.filesOrDirs(
U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_WAL_ARCHIVE_PATH, false))
.filter((type, ptr) -> type == DATA_RECORD_V2);

Set<Integer> keys = new HashSet<>();

try (WALIterator it = new IgniteWalIteratorFactory().iterator(builder)) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();

DataRecord rec = (DataRecord)tup.get2();

for (DataEntry entry : rec.writeEntries())
keys.add(entry.key().value(null, false));
}
}

for (int j = 0; j < maxKey; j++)
assertTrue(keys.contains(j));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ignite.cdc.CdcSelfTest;
import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
import org.apache.ignite.cdc.WalForCdcTest;
import org.apache.ignite.cdc.WalRolloverOnStopTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceCheckpointTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceTwoPartsInDifferentCheckpointsTest;
Expand Down Expand Up @@ -155,6 +156,7 @@ public static void addRealPageStoreTests(List<Class<?>> suite, Collection<Class>
GridTestUtils.addTestIfNeeded(suite, CdcSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, ignoredTests);
Expand Down

0 comments on commit dbf1c78

Please sign in to comment.