diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index f68b366fbb6ff..c1f04bd3baab7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -1455,16 +1455,25 @@ private FileWriteHandle rollOver(FileWriteHandle cur, @Nullable WALRecord rec) t * @throws StorageException If failed to initialize WAL write handle. */ private FileWriteHandle restoreWriteHandle(@Nullable WALPointer lastReadPtr) throws StorageException { - long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); - @Nullable FileArchiver archiver0 = archiver; - long segNo = archiver0 == null ? absIdx : absIdx % dsCfg.getWalSegments(); + long absIdx; + int off; - File curFile = new File(walWorkDir, fileName(segNo)); + if (lastReadPtr == null) { + absIdx = 0; + off = 0; + } + else if (nextSegmentInited(lastReadPtr)) { + absIdx = lastReadPtr.index() + 1; + off = HEADER_RECORD_SIZE; + } + else { + absIdx = lastReadPtr.index(); + off = lastReadPtr.fileOffset() + lastReadPtr.length(); + } - int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); - int len = lastReadPtr == null ? 0 : lastReadPtr.length(); + File curFile = segmentFile(absIdx); try { SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile)); @@ -1494,7 +1503,7 @@ private FileWriteHandle restoreWriteHandle(@Nullable WALPointer lastReadPtr) thr ", offset=" + off + ", ver=" + serVer + ']'); } - FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off + len, ser); + FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off, ser); segmentAware.curAbsWalIdx(absIdx); @@ -1545,6 +1554,36 @@ private FileWriteHandle restoreWriteHandle(@Nullable WALPointer lastReadPtr) thr } } + /** */ + private File segmentFile(long absIdx) { + long segNo = archiver == null ? absIdx : absIdx % dsCfg.getWalSegments(); + + return new File(walWorkDir, fileName(segNo)); + } + + /** @return {@code True} if the given pointer is the last in a segment and a next segment has been initialized. */ + private boolean nextSegmentInited(WALPointer ptr) { + try { + try (WALIterator iter = replay(new WALPointer(ptr.index(), ptr.fileOffset() + ptr.length(), 0))) { + if (iter.hasNext()) + return false; + } + + long nextIdx = ptr.index() + 1; + + try (SegmentIO fileIO = new SegmentIO(nextIdx, ioFactory.create(segmentFile(nextIdx), READ))) { + readSegmentHeader(fileIO, segmentFileInputFactory); + } + + return true; + } + catch (Exception ignored) { + // No-op. + } + + return false; + } + /** * Fills the file header for a new segment. Calling this method signals we are done with the segment and it can be * archived. If we don't have prepared file yet and achiever is busy this method blocks. diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java index d28abb3e5b1f1..57cdee034dbe7 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java @@ -41,6 +41,9 @@ public class RestartWithWalForceArchiveTimeoutTest extends GridCommonAbstractTes @Parameterized.Parameter public WALMode walMode; + /** */ + private long walForceArchiveTimeout; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -49,7 +52,7 @@ public class RestartWithWalForceArchiveTimeoutTest extends GridCommonAbstractTes cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setWalMode(walMode) - .setWalForceArchiveTimeout(60 * 60 * 1000) // 1 hour to make sure auto archive will not work. + .setWalForceArchiveTimeout(walForceArchiveTimeout) .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))); return cfg; @@ -61,12 +64,17 @@ public static Collection parameters() { return EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, WALMode.BACKGROUND); } - /** */ - @Test - public void testRestart() throws Exception { + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { stopAllGrids(true); cleanPersistenceDir(); + } + + /** */ + @Test + public void testRestart() throws Exception { + walForceArchiveTimeout = 60 * 60 * 1000; // 1 hour to make sure auto archive will not work. Supplier restart = () -> { stopAllGrids(true); @@ -92,4 +100,34 @@ public void testRestart() throws Exception { for (int i = 0; i < 5; i++) restart.get(); } + + /** */ + @Test + public void testRestartAfterArchive() throws Exception { + walForceArchiveTimeout = 1000; + + IgniteEx srv = startGrid(0); + + srv.cluster().state(ACTIVE); + + IgniteCache cache = srv.getOrCreateCache(DEFAULT_CACHE_NAME); + + cache.put(1, 1); + + forceCheckpoint(); + + Thread.sleep(2 * walForceArchiveTimeout); + + stopGrid(0); + srv = startGrid(0); + cache = srv.cache(DEFAULT_CACHE_NAME); + + cache.put(2, 2); + + stopGrid(0); + srv = startGrid(0); + cache = srv.cache(DEFAULT_CACHE_NAME); + + assertEquals(2, cache.size()); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java new file mode 100644 index 0000000000000..2e21eb86f0aea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; + +/** + * This tests check that the following scenario will works correctly. + */ +@RunWith(Parameterized.class) +public class WalRolloverOnStopTest extends GridCommonAbstractTest { + /** WAL mode. */ + @Parameterized.Parameter + public WALMode walMode; + + /** @return Test parameters. */ + @Parameterized.Parameters(name = "walMode={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] {{WALMode.BACKGROUND}, {WALMode.LOG_ONLY}, {WALMode.FSYNC}}); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setDataStorageConfiguration(new DataStorageConfiguration() + .setWalAutoArchiveAfterInactivity(1500L) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true))); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * Test scenario: + * + * 0. {@link DataStorageConfiguration#getWalAutoArchiveAfterInactivity()} > 0. + * 1. Node is gracefully stopping using {@link G#stop(String, boolean)}. + * 2. T0: {@code Checkpointer#doCheckpoint()} execute last checkpoint on stop and freeze. + * 3. T1: Rollover segment after inactivity timeout. + * 4. T2: Archive segment. + * + * After restart WAL should log in the next segment. + * */ + @Test + public void testWallRollover() throws Exception { + AtomicLong curIdx = new AtomicLong(); + + for (int i = 0; i < 2; i++) { + IgniteEx ign = startGrid(0); + + GridCacheDatabaseSharedManager db = + (GridCacheDatabaseSharedManager)ign.context().cache().context().database(); + + SegmentAware aware = GridTestUtils.getFieldValue(ign.context().cache().context().wal(), "segmentAware"); + + ign.cluster().state(ClusterState.ACTIVE); + + IgniteCache cache = ign.getOrCreateCache("my-cache"); + + CountDownLatch waitAfterCp = new CountDownLatch(1); + AtomicLong cntr = new AtomicLong(0); + + db.addCheckpointListener(new CheckpointListener() { + @Override public void afterCheckpointEnd(Context ctx) { + if (!ign.context().isStopping()) + return; + + try { + waitAfterCp.await(getTestTimeout(), TimeUnit.MILLISECONDS); + + cntr.incrementAndGet(); + } + catch (InterruptedException e) { + throw new IgniteException(e); + } + } + + @Override public void onMarkCheckpointBegin(Context ctx) { + // No-op. + } + + @Override public void onCheckpointBegin(Context ctx) { + // No-op. + } + + @Override public void beforeCheckpointBegin(Context ctx) { + // No-op. + } + }); + + int maxKey = (i + 1) * 3; + + for (int j = i * 3; j < maxKey; j++) + cache.put(j, j); + + curIdx.set(aware.curAbsWalIdx()); + + IgniteInternalFuture fut = runAsync(() -> { + try { + aware.awaitSegmentArchived(curIdx.get()); + + cntr.incrementAndGet(); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException(e); + } + finally { + waitAfterCp.countDown(); + } + }); + + G.stop(ign.name(), false); + + fut.get(getTestTimeout()); + + // Checkpoint will happens two time because of segment archivation. + assertEquals("Should successfully wait for current segment archivation", 3, cntr.get()); + + IgniteWalIteratorFactory.IteratorParametersBuilder builder = + new IgniteWalIteratorFactory.IteratorParametersBuilder() + .log(ign.log()) + .filesOrDirs( + U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_WAL_ARCHIVE_PATH, false)) + .filter((type, ptr) -> type == DATA_RECORD_V2); + + Set keys = new HashSet<>(); + + try (WALIterator it = new IgniteWalIteratorFactory().iterator(builder)) { + while (it.hasNext()) { + IgniteBiTuple tup = it.next(); + + DataRecord rec = (DataRecord)tup.get2(); + + for (DataEntry entry : rec.writeEntries()) + keys.add(entry.key().value(null, false)); + } + } + + for (int j = 0; j < maxKey; j++) + assertTrue(keys.contains(j)); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 8b39f3e54ed90..2d3e06fffd475 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -26,6 +26,7 @@ import org.apache.ignite.cdc.CdcSelfTest; import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest; import org.apache.ignite.cdc.WalForCdcTest; +import org.apache.ignite.cdc.WalRolloverOnStopTest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceCheckpointTest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceTwoPartsInDifferentCheckpointsTest; @@ -155,6 +156,7 @@ public static void addRealPageStoreTests(List> suite, Collection GridTestUtils.addTestIfNeeded(suite, CdcSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, RestartWithWalForceArchiveTimeoutTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, ignoredTests);