From 45fb14a118889110298e49139fbb96e6ccd6ab74 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 6 Jun 2023 12:45:17 +0300 Subject: [PATCH 01/22] Fixed CDC data records logging to WAL for in-memory caches when disabled. --- .../cache/persistence/wal/FileWriteAheadLogManager.java | 8 ++++++++ .../src/test/java/org/apache/ignite/cdc/CdcSelfTest.java | 9 ++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index f68b366fbb6ff..c7cd75b46d77b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -122,6 +122,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiPredicate; @@ -924,6 +925,13 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { !(rec instanceof PageDeltaRecord || rec instanceof PageSnapshot || rec instanceof MemoryRecoveryRecord)) return null; + if (inMemoryCdc && cdcDisabled.getOrDefault(false)) { + LT.warn(log, "Logging CDC data records to WAL skipped. '" + CDC_DISABLED + + "' distributed property is 'true'."); + + return null; + } + FileWriteHandle currWrHandle = currentHandle(); WALDisableContext isDisable = walDisableContext; diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index b836e6facd6ea..04c584806f603 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -802,9 +802,13 @@ public void testDisable() throws Exception { addData(cache, 0, 1); - File walCdcDir = U.field(ign.context().cache().context().wal(true), "walCdcDir"); + FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ign.context().cache().context().wal(true); + + File walCdcDir = wal.walCdcDirectory(); + File archiveDir = wal.archiveDir(); assertTrue(waitForCondition(() -> 1 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); + assertEquals(1, archiveDir.listFiles().length); DistributedChangeableProperty disabled = ign.context().distributedConfiguration() .property(FileWriteAheadLogManager.CDC_DISABLED); @@ -816,12 +820,15 @@ public void testDisable() throws Exception { Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT); assertEquals(1, walCdcDir.list().length); + // In-memory CDC should not produce WAL records when disabled. + assertEquals(persistenceEnabled ? 2 : 1, archiveDir.list().length); disabled.propagate(false); addData(cache, 0, 1); assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); + assertEquals(persistenceEnabled ? 3 : 2, archiveDir.listFiles().length); } /** */ From 68cbebed28ad944ca563484bb399d683109b6730 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 15 Jun 2023 12:00:44 +0300 Subject: [PATCH 02/22] Fixed CDC data records logging to WAL for in-memory caches when disabled. --- .../wal/FileWriteAheadLogManager.java | 95 +++++++++++++------ .../org/apache/ignite/cdc/CdcSelfTest.java | 22 ++++- 2 files changed, 87 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index c7cd75b46d77b..90717b7ddb46f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -112,6 +112,7 @@ import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -421,6 +422,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** CDC disabled flag. */ private final DistributedBooleanProperty cdcDisabled = detachedBooleanProperty(CDC_DISABLED); + /** Segment IDs to skip the CDC hard link creation. */ + private final Set cdcForceSkipSgmnts = new GridConcurrentHashSet<>(1); + /** * Constructor. * @@ -518,6 +522,18 @@ public void setFileIOFactory(FileIOFactory ioFactory) { if (newVal != null && newVal) log.warning("CDC was disabled."); + + if (oldVal != null && oldVal && !newVal) { + if (log.isInfoEnabled()) + log.info("CDC was enabled."); + + if (inMemoryCdc) { + FileWriteHandle handle = currentHandle(); + + if (handle != null) + closeBufAndRollover(handle); + } + } }); dispatcher.registerProperty(cdcDisabled); @@ -887,14 +903,7 @@ else if (!checkTimeout(lastRecordLoggedMs, walAutoArchiveAfterInactivity)) final FileWriteHandle handle = currentHandle(); - try { - closeBufAndRollover(handle, null, RolloverType.NONE); - } - catch (IgniteCheckedException e) { - U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); - - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); - } + closeBufAndRollover(handle); } /** */ @@ -925,13 +934,6 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { !(rec instanceof PageDeltaRecord || rec instanceof PageSnapshot || rec instanceof MemoryRecoveryRecord)) return null; - if (inMemoryCdc && cdcDisabled.getOrDefault(false)) { - LT.warn(log, "Logging CDC data records to WAL skipped. '" + CDC_DISABLED + - "' distributed property is 'true'."); - - return null; - } - FileWriteHandle currWrHandle = currentHandle(); WALDisableContext isDisable = walDisableContext; @@ -940,6 +942,15 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { if (currWrHandle == null || (isDisable != null && isDisable.check())) return null; + if (inMemoryCdc && cdcDisabled.getOrDefault(false)) { + LT.warn(log, "Logging CDC data records to WAL skipped. '" + CDC_DISABLED + + "' distributed property is 'true'."); + + cdcForceSkipSgmnts.add(currWrHandle.getSegmentId()); + + return null; + } + // Do page snapshots compression if configured. if (pageCompression != DiskPageCompression.DISABLED && rec instanceof PageSnapshot) { PageSnapshot pageSnapshot = (PageSnapshot)rec; @@ -1015,6 +1026,18 @@ else if (rolloverType == RolloverType.CURRENT_SEGMENT) { } } + /** */ + private void closeBufAndRollover(FileWriteHandle currWriteHandle) { + try { + closeBufAndRollover(currWriteHandle, null, RolloverType.NONE); + } + catch (IgniteCheckedException e) { + U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); + + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); + } + } + /** */ private FileWriteHandle closeBufAndRollover( FileWriteHandle currWriteHandle, @@ -2157,20 +2180,8 @@ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException Files.move(dstTmpFile.toPath(), dstFile.toPath()); - if (walCdcDir != null) { - if (!cdcDisabled.getOrDefault(false)) { - if (checkCdcWalDirectorySize(dstFile.length())) - Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); - else { - log.error("Creation of segment CDC link skipped. Configured CDC directory " + - "maximum size exceeded."); - } - } - else { - log.warning("Creation of segment CDC link skipped. " + - "'" + CDC_DISABLED + "' distributed property is 'true'."); - } - } + if (walCdcDir != null) + createCdcLink(dstFile, absIdx); if (mode != WALMode.NONE) { try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) { @@ -2237,6 +2248,32 @@ public void restart() { new IgniteThread(archiver).start(); } + /** Create CDC hard link to the archived WAL segment. */ + private void createCdcLink(File dstFile, long absIdx) throws IOException { + if (cdcForceSkipSgmnts.remove(absIdx)) { + log.warning("Creation of segment CDC link skipped. The segment does not contain data records " + + "that was skipped while the CDC was disabled."); + + return; + } + + if (cdcDisabled.getOrDefault(false)) { + log.warning("Creation of segment CDC link skipped. " + + "'" + CDC_DISABLED + "' distributed property is 'true'."); + + return; + } + + if (!checkCdcWalDirectorySize(dstFile.length())) { + log.error("Creation of segment CDC link skipped. Configured CDC directory " + + "maximum size exceeded."); + + return; + } + + Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); + } + /** * @param len Length of file to check size. * @return {@code True} if the CDC directory size check successful, otherwise {@code false}. diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 04c584806f603..a13b75ec61022 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryType; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -82,6 +83,7 @@ import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.runAsync; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.junit.Assume.assumeTrue; @@ -828,7 +830,25 @@ public void testDisable() throws Exception { addData(cache, 0, 1); assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); - assertEquals(persistenceEnabled ? 3 : 2, archiveDir.listFiles().length); + assertEquals(3, archiveDir.listFiles().length); + + UserCdcConsumer cnsmr = new UserCdcConsumer(); + + IgniteInternalFuture cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); + + waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + + // Cdc application must fail due to skipped data. + assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, "Found missed segments. Some events are missed."); + + ign.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); + + cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); + cnsmr.data.clear(); + + waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + + cdcFut.cancel(); } /** */ From 244398e40533f6e6c742dba00a5b9c54745e9de9 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 15 Jun 2023 17:11:43 +0300 Subject: [PATCH 03/22] Tests fix --- .../apache/ignite/util/CdcCommandTest.java | 50 ++++++++++++++++--- .../wal/FileWriteAheadLogManager.java | 8 +-- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index fb7835d2abca9..d45638280d338 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -29,10 +29,12 @@ import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cdc.AbstractCdcTest.UserCdcConsumer; import org.apache.ignite.cdc.CdcConfiguration; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.internal.GridJobExecuteRequest; import org.apache.ignite.internal.GridJobExecuteResponse; import org.apache.ignite.internal.IgniteEx; @@ -56,7 +58,11 @@ import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; +import org.junit.Assume; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE; import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT; import static org.apache.ignite.cdc.CdcSelfTest.addData; @@ -74,6 +80,7 @@ /** * CDC command tests. */ +@RunWith(Parameterized.class) public class CdcCommandTest extends GridCommandHandlerAbstractTest { /** */ private static final String CDC_DISABLED_DATA_REGION = "cdc_disabled_data_region"; @@ -99,6 +106,16 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { /** */ private volatile IgniteThrowableConsumer onLogLsnr; + /** */ + @Parameterized.Parameter + public boolean persistenceEnabled; + + /** */ + @Parameterized.Parameters(name = "persistence={0}") + public static Object[] parameters() { + return new Object[] {true, false}; + } + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -112,7 +129,8 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { .setName(CDC_DISABLED_DATA_REGION) .setCdcEnabled(false)) .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setCdcEnabled(true))); + .setCdcEnabled(true) + .setPersistenceEnabled(persistenceEnabled))); cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED); @@ -148,6 +166,9 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { srv0 = startGrid(0); srv1 = startGrid(1); + if (persistenceEnabled) + srv0.cluster().state(ClusterState.ACTIVE); + awaitPartitionMapExchange(); cdcDisabled = srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED); @@ -225,6 +246,14 @@ public void testDeleteLostSegmentLinksOneNode() throws Exception { /** */ @Test public void testDeleteLostSegmentLinksMultipleGaps() throws Exception { + checkDeleteLostSegmentLinks(F.asList(0L, 2L, 4L), F.asList(4L), true); + } + + /** */ + @Test + public void testDeleteLostSegmentLinksMultipleSegmentGaps() throws Exception { + Assume.assumeTrue(persistenceEnabled); + checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true); } @@ -260,25 +289,27 @@ private void archiveSegmentLinks(List idxs) throws Exception { for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) { cdcDisabled.propagate(!idxs.contains(idx)); - archiveSegment(); + archiveSegment(idx); } } /** */ - private void archiveSegment() throws Exception { + private void archiveSegment(long expIdx) throws Exception { CountDownLatch latch = new CountDownLatch(G.allGrids().size()); for (Ignite srv : G.allGrids()) { srv.events().localListen(evt -> { - latch.countDown(); + if (expIdx == ((WalSegmentArchivedEvent)evt).getAbsWalSegmentIdx()) + latch.countDown(); - return false; + return true; }, EVT_WAL_SEGMENT_ARCHIVED); } addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); - latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); + if (persistenceEnabled || !(Boolean)cdcDisabled.getOrDefault(false)) + latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); } /** */ @@ -384,7 +415,12 @@ public void testResendCancelOnRebalanceInProgress() throws Exception { }); } - GridTestUtils.runAsync(() -> startGrid(3)); + GridTestUtils.runAsync(() -> { + startGrid(3); + + if (persistenceEnabled) + srv0.cluster().setBaselineTopology(srv0.cluster().forServers().nodes()); + }); rebalanceStarted.await(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 90717b7ddb46f..dcfd47b0ed8c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -943,7 +943,7 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { return null; if (inMemoryCdc && cdcDisabled.getOrDefault(false)) { - LT.warn(log, "Logging CDC data records to WAL skipped. '" + CDC_DISABLED + + LT.warn(log, "Logging CDC data records to WAL skipped. The '" + CDC_DISABLED + "' distributed property is 'true'."); cdcForceSkipSgmnts.add(currWrHandle.getSegmentId()); @@ -2252,21 +2252,21 @@ public void restart() { private void createCdcLink(File dstFile, long absIdx) throws IOException { if (cdcForceSkipSgmnts.remove(absIdx)) { log.warning("Creation of segment CDC link skipped. The segment does not contain data records " + - "that was skipped while the CDC was disabled."); + "that was skipped while the CDC was disabled [idx=" + absIdx + ']'); return; } if (cdcDisabled.getOrDefault(false)) { log.warning("Creation of segment CDC link skipped. " + - "'" + CDC_DISABLED + "' distributed property is 'true'."); + "The '" + CDC_DISABLED + "' distributed property is 'true' [idx=" + absIdx + ']'); return; } if (!checkCdcWalDirectorySize(dstFile.length())) { log.error("Creation of segment CDC link skipped. Configured CDC directory " + - "maximum size exceeded."); + "maximum size exceeded [idx=" + absIdx + ']'); return; } From 89bd1517766361d0484b0c5552b6a0b26076c083 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Wed, 28 Jun 2023 22:17:50 +0300 Subject: [PATCH 04/22] wip --- .../apache/ignite/internal/cdc/CdcMain.java | 3 +- .../internal/cdc/WalRecordsConsumer.java | 5 + .../pagemem/wal/record/CdcDisabledRecord.java | 28 +++ .../pagemem/wal/record/WALRecord.java | 5 +- .../wal/FileWriteAheadLogManager.java | 108 +++++------ .../serializer/RecordDataV1Serializer.java | 8 + .../cdc/VisorCdcDeleteLostSegmentsTask.java | 180 +++++++++++++----- .../org/apache/ignite/cdc/CdcSelfTest.java | 4 +- 8 files changed, 232 insertions(+), 109 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisabledRecord.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 1e73e75be2204..9e45f59f109ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -79,6 +79,7 @@ import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer; import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLED; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex; @@ -486,7 +487,7 @@ private void consumeSegment(Path segment) { .marshallerMappingFileStoreDir(marshaller) .keepBinary(cdcCfg.isKeepBinary()) .filesOrDirs(segment.toFile()) - .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD); + .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD || type == CDC_DISABLED); if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0) builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 8a00403164491..b52af66e4cfb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -303,6 +303,11 @@ private void advance() { curRec = walIter.next(); + if (curRec.get2().type() == WALRecord.RecordType.CDC_DISABLED) { + throw new IgniteException("Found the CDC disabled record. Some events are missed. Exiting! " + + "[state=" + curRec.get1() + ']'); + } + next = ((DataRecord)curRec.get2()).get(entryIdx); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisabledRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisabledRecord.java new file mode 100644 index 0000000000000..34c9ef1ad0d01 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisabledRecord.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +/** + * The record to notify the CDC application about skipped data records while the CDC was disabled. + */ +public class CdcDisabledRecord extends WALRecord { + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.CDC_DISABLED; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 3457251e0f44b..6836c80c421d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -291,7 +291,10 @@ public enum RecordType { INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL), /** CDC data record. */ - CDC_DATA_RECORD(78, CUSTOM); + CDC_DATA_RECORD(78, CUSTOM), + + /** CDC disabled record. */ + CDC_DISABLED(79, CUSTOM); /** Index for serialization. Should be consistent throughout all versions. */ private final int idx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index dcfd47b0ed8c2..ede5e3003c374 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -47,6 +47,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -72,6 +73,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; @@ -112,7 +114,6 @@ import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -422,8 +423,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** CDC disabled flag. */ private final DistributedBooleanProperty cdcDisabled = detachedBooleanProperty(CDC_DISABLED); - /** Segment IDs to skip the CDC hard link creation. */ - private final Set cdcForceSkipSgmnts = new GridConcurrentHashSet<>(1); + /** */ + private final AtomicBoolean cdcDisabledRecLogged = new AtomicBoolean(); /** * Constructor. @@ -527,12 +528,7 @@ public void setFileIOFactory(FileIOFactory ioFactory) { if (log.isInfoEnabled()) log.info("CDC was enabled."); - if (inMemoryCdc) { - FileWriteHandle handle = currentHandle(); - - if (handle != null) - closeBufAndRollover(handle); - } + cdcDisabledRecLogged.set(false); } }); @@ -903,7 +899,14 @@ else if (!checkTimeout(lastRecordLoggedMs, walAutoArchiveAfterInactivity)) final FileWriteHandle handle = currentHandle(); - closeBufAndRollover(handle); + try { + closeBufAndRollover(handle, null, RolloverType.NONE); + } + catch (IgniteCheckedException e) { + U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); + + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); + } } /** */ @@ -934,6 +937,9 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { !(rec instanceof PageDeltaRecord || rec instanceof PageSnapshot || rec instanceof MemoryRecoveryRecord)) return null; + if (skipIfCdcDisabled(rec)) + return null; + FileWriteHandle currWrHandle = currentHandle(); WALDisableContext isDisable = walDisableContext; @@ -942,15 +948,6 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { if (currWrHandle == null || (isDisable != null && isDisable.check())) return null; - if (inMemoryCdc && cdcDisabled.getOrDefault(false)) { - LT.warn(log, "Logging CDC data records to WAL skipped. The '" + CDC_DISABLED + - "' distributed property is 'true'."); - - cdcForceSkipSgmnts.add(currWrHandle.getSegmentId()); - - return null; - } - // Do page snapshots compression if configured. if (pageCompression != DiskPageCompression.DISABLED && rec instanceof PageSnapshot) { PageSnapshot pageSnapshot = (PageSnapshot)rec; @@ -1026,18 +1023,6 @@ else if (rolloverType == RolloverType.CURRENT_SEGMENT) { } } - /** */ - private void closeBufAndRollover(FileWriteHandle currWriteHandle) { - try { - closeBufAndRollover(currWriteHandle, null, RolloverType.NONE); - } - catch (IgniteCheckedException e) { - U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); - - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); - } - } - /** */ private FileWriteHandle closeBufAndRollover( FileWriteHandle currWriteHandle, @@ -1849,6 +1834,20 @@ public long maxWalSegmentSize() { return maxWalSegmentSize; } + /** @return {@code True} if log record should be skipped when CDC is disabled. */ + private boolean skipIfCdcDisabled(WALRecord rec) throws IgniteCheckedException { + if (!inMemoryCdc || rec instanceof CdcDisabledRecord || !cdcDisabled.getOrDefault(false)) + return false; + + LT.warn(log, "Logging CDC data records to WAL skipped. The '" + CDC_DISABLED + + "' distributed property is 'true'."); + + if (cdcDisabledRecLogged.compareAndSet(false, true)) + log(new CdcDisabledRecord()); + + return true; + } + /** * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate the * work WAL segment: S(N) = N % dsCfg.walSegments. When a work segment is finished, it is given to the archiver. If @@ -2180,8 +2179,20 @@ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException Files.move(dstTmpFile.toPath(), dstFile.toPath()); - if (walCdcDir != null) - createCdcLink(dstFile, absIdx); + if (walCdcDir != null) { + if (!cdcDisabled.getOrDefault(false)) { + if (checkCdcWalDirectorySize(dstFile.length())) + Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); + else { + log.error("Creation of segment CDC link skipped. Configured CDC directory " + + "maximum size exceeded."); + } + } + else { + log.warning("Creation of segment CDC link skipped. " + + "'" + CDC_DISABLED + "' distributed property is 'true'."); + } + } if (mode != WALMode.NONE) { try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) { @@ -2248,32 +2259,6 @@ public void restart() { new IgniteThread(archiver).start(); } - /** Create CDC hard link to the archived WAL segment. */ - private void createCdcLink(File dstFile, long absIdx) throws IOException { - if (cdcForceSkipSgmnts.remove(absIdx)) { - log.warning("Creation of segment CDC link skipped. The segment does not contain data records " + - "that was skipped while the CDC was disabled [idx=" + absIdx + ']'); - - return; - } - - if (cdcDisabled.getOrDefault(false)) { - log.warning("Creation of segment CDC link skipped. " + - "The '" + CDC_DISABLED + "' distributed property is 'true' [idx=" + absIdx + ']'); - - return; - } - - if (!checkCdcWalDirectorySize(dstFile.length())) { - log.error("Creation of segment CDC link skipped. Configured CDC directory " + - "maximum size exceeded [idx=" + absIdx + ']'); - - return; - } - - Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); - } - /** * @param len Length of file to check size. * @return {@code True} if the CDC directory size check successful, otherwise {@code false}. @@ -3396,6 +3381,11 @@ private boolean walArchiveUnlimited() { return maxWalArchiveSize == UNLIMITED_WAL_ARCHIVE; } + /** @return {@code True} if WAL enabled only for CDC. */ + public boolean inMemoryCdc() { + return inMemoryCdc; + } + /** * Removing files from {@link #walArchiveDir}. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index 3907bf2c98fd4..eabbdec823520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.record.CacheState; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; @@ -541,6 +542,7 @@ assert record instanceof PageSnapshot; return 4 + 8 + 1; case SWITCH_SEGMENT_RECORD: + case CDC_DISABLED: return 0; case TX_RECORD: @@ -1323,6 +1325,11 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, break; + case CDC_DISABLED: + res = new CdcDisabledRecord(); + + break; + default: throw new UnsupportedOperationException("Type: " + type); } @@ -1912,6 +1919,7 @@ void writePlainRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedExcepti break; case SWITCH_SEGMENT_RECORD: + case CDC_DISABLED: break; case MASTER_KEY_CHANGE_RECORD_V2: diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java index c3a8f506dc0f9..752afefa47aab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java @@ -26,22 +26,32 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.cdc.CdcConsumerState; import org.apache.ignite.internal.cdc.CdcFileLockHolder; import org.apache.ignite.internal.management.cdc.CdcDeleteLostSegmentLinksCommandArg; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; + import static org.apache.ignite.internal.cdc.CdcConsumerState.WAL_STATE_FILE_NAME; import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLED; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; /** @@ -78,6 +88,12 @@ private static class VisorCdcDeleteLostSegmentsJob extends VisorJob cdcFiles = Files.list(walCdcDir.toPath())) { - Set delete = new HashSet<>(); + boolean lostDeleted = deleteLostSegments(); - AtomicLong lastSgmnt = new AtomicLong(-1); + WALPointer lastCdcDisabledRec = findLastCdcDisabledRecord(); - cdcFiles - .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) - .sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex) - .reversed()) // Sort by segment index. - .forEach(path -> { - long idx = FileWriteAheadLogManager.segmentIndex(path); + if (lastCdcDisabledRec != null) + setWalState(lastCdcDisabledRec.next()); // Reset WAL state to the next record. + else if (lostDeleted) + setWalState(null); // Delete WAL state. + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to delete lost segment CDC links. " + + "Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " + + "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']'); + } + finally { + U.closeQuiet(lock); + } - if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { - lastSgmnt.set(idx); + return null; + } - return; - } + /** @return {@code True} if lost segments were found and successfully deleted. */ + private boolean deleteLostSegments() { + Set delete = new HashSet<>(); - delete.add(path.toFile()); - }); + AtomicLong lastSgmnt = new AtomicLong(-1); - if (delete.isEmpty()) { - log.info("Lost segment CDC links were not found."); + consumeCdcSegments(segment -> { + long idx = FileWriteAheadLogManager.segmentIndex(segment); - return null; - } + if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { + lastSgmnt.set(idx); - log.info("Found lost segment CDC links. The following links will be deleted: " + delete); + return; + } - delete.forEach(file -> { - if (!file.delete()) { - throw new IgniteException("Failed to delete lost segment CDC link [file=" + - file.getAbsolutePath() + ']'); - } + delete.add(segment.toFile()); + }); - log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']'); - }); + if (delete.isEmpty()) { + log.info("Lost segment CDC links were not found."); - Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); + return false; + } - if (stateDir.toFile().exists()) { - File walState = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); + log.info("Found lost segment CDC links. The following links will be deleted: " + delete); - if (walState.exists() && !walState.delete()) { - throw new IgniteException("Failed to delete wal state file [file=" + - walState.getAbsolutePath() + ']'); - } - } + delete.forEach(file -> { + if (!file.delete()) { + throw new IgniteException("Failed to delete lost segment CDC link [file=" + + file.getAbsolutePath() + ']'); } - catch (IOException e) { - throw new RuntimeException("Failed to delete lost segment CDC links.", e); + + log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']'); + }); + + return true; + } + + /** @return WAL pointer to the last {@link CdcDisabledRecord}. */ + private WALPointer findLastCdcDisabledRecord() { + if (!wal.inMemoryCdc()) + return null; + + AtomicReference lastRec = new AtomicReference<>(); + + consumeCdcSegments(segment -> { + if (lastRec.get() != null) + return; + + if (log.isInfoEnabled()) + log.info("Processing CDC segment [segment=" + segment + ']'); + + IgniteWalIteratorFactory.IteratorParametersBuilder builder = + new IgniteWalIteratorFactory.IteratorParametersBuilder() + .log(log) + .filesOrDirs(segment.toFile()) + .addFilter((type, ptr) -> type == CDC_DISABLED); + + if (ignite.configuration().getDataStorageConfiguration().getPageSize() != 0) + builder.pageSize(ignite.configuration().getDataStorageConfiguration().getPageSize()); + + try (WALIterator it = new IgniteWalIteratorFactory(log).iterator(builder)) { + while (it.hasNext()) + lastRec.set(it.next().getKey()); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to read CDC segment [path=" + segment + ']', e); } + }); + + if (log.isInfoEnabled() && lastRec.get() != null) + log.info("Found CDC disabled record [ptr=" + lastRec.get() + ']'); + + return lastRec.get(); + } + + /** @param ptr WAL pointer to set state or {@code null} to delete state. */ + private void setWalState(WALPointer ptr) { + Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); + + if (ptr == null) { + File state = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); + + if (state.exists() && !state.delete()) + throw new IgniteException("Failed to delete wal state file [file=" + state.getAbsolutePath() + ']'); + + return; } - catch (IgniteCheckedException e) { - throw new RuntimeException("Failed to delete lost segment CDC links. " + - "Unable to acquire lock to lock CDC folder. Make sure a CDC app is shut down " + - "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" + e.getMessage() + ']'); + + CdcConsumerState state = new CdcConsumerState(log, stateDir); + + try { + state.saveWal(new T2<>(ptr, 0)); } - finally { - U.closeQuiet(lock); + catch (IOException e) { + throw new IgniteException("Failed to set WAL state file.", e); } + } - return null; + /** Consume CDC segments in reversed order. */ + private void consumeCdcSegments(Consumer cnsmr) { + try (Stream cdcFiles = Files.list(walCdcDir.toPath())) { + cdcFiles + .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) + .sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex) + .reversed()) // Sort by segment index. + .forEach(cnsmr); + } + catch (IOException e) { + throw new RuntimeException("Failed to list CDC links.", e); + } } } } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index a13b75ec61022..3d7055c8e7105 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -830,7 +830,7 @@ public void testDisable() throws Exception { addData(cache, 0, 1); assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); - assertEquals(3, archiveDir.listFiles().length); + assertEquals(persistenceEnabled ? 3 : 2, archiveDir.listFiles().length); UserCdcConsumer cnsmr = new UserCdcConsumer(); @@ -839,7 +839,7 @@ public void testDisable() throws Exception { waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); // Cdc application must fail due to skipped data. - assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, "Found missed segments. Some events are missed."); + assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, "Found the CDC disabled record. Some events are missed."); ign.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); From e6672edf32d7fea3e43ac22ef0f6aa9967e7881c Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 29 Jun 2023 15:31:19 +0300 Subject: [PATCH 05/22] wip --- .../persistence/change-data-capture.adoc | 16 +++- .../apache/ignite/util/CdcCommandTest.java | 74 ++++++++++++++----- .../ignite/util/CdcResendCommandTest.java | 3 +- .../cdc/CdcDeleteLostSegmentLinksCommand.java | 11 +-- .../cdc/VisorCdcDeleteLostSegmentsTask.java | 8 +- .../org/apache/ignite/cdc/CdcSelfTest.java | 4 +- 6 files changed, 88 insertions(+), 28 deletions(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index c491969b265bb..f7c93ce1c1734 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -165,12 +165,26 @@ control.sh|bat --cdc delete_lost_segment_links control.sh|bat --cdc delete_lost_segment_links --node-id node_id ---- -The command will remove all segment links before the last gap. +The command will remove all segment links before the last gap. Also for in-memory CDC, the state will be reset to the last CDC disabled record. For example, CDC was turned off several times: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, `0000000000000011.wal` Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`. The application will start from the `0000000000000010.wal` segment after being enabled. +== Handling disabled in-memory CDC + +When in-memory CDC disabled, the special WAL record (CDC disabled record) will be written on data changes. + +WARNING: All changes are skipped when in-memory CDC is disabled. + +The CDC application will fail if meets this record while processing segments. Example of error: +`Found the CDC disabled record. Some events are missed. Exiting! [state=WALPointer [idx=1, fileOff=29, len=21]]` + +It means that some data changes are skipped and administrator attention is required. + +You can reset the CDC application state to the last CDC disabled record and fix possible gaps +using the `delete_lost_segment_links` link:#handling-skipped-segments[command]. + == Forcefully resend all cache data to CDC When the CDC has been forcefully disabled for a while, cache changes will be skipped. diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index a270e3acb5fac..8df7068126ebc 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -72,6 +73,7 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; import static org.apache.ignite.testframework.GridTestUtils.assertContains; +import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct; import static org.apache.ignite.testframework.GridTestUtils.stopThreads; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.ignite.util.GridCommandHandlerClusterByClassTest.CACHES; @@ -91,6 +93,9 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { /** */ public static final String RESEND = "resend"; + /** */ + public static final int WAL_ARCHIVE_TIMEOUT = 1_000; + /** */ private IgniteEx srv0; @@ -107,13 +112,13 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest { private volatile IgniteThrowableConsumer onLogLsnr; /** */ - @Parameterized.Parameter + @Parameterized.Parameter(1) public boolean persistenceEnabled; /** */ - @Parameterized.Parameters(name = "persistence={0}") - public static Object[] parameters() { - return new Object[] {true, false}; + @Parameterized.Parameters(name = "cmdHnd={0}, persistence={1}") + public static Collection parameters() { + return cartesianProduct(CMD_HNDS.keySet(), F.asList(true, false)); } /** {@inheritDoc} */ @@ -124,7 +129,7 @@ public static Object[] parameters() { .setBackups(1)); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setWalForceArchiveTimeout(1000) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) .setDataRegionConfigurations(new DataRegionConfiguration() .setName(CDC_DISABLED_DATA_REGION) .setCdcEnabled(false)) @@ -248,15 +253,48 @@ public void testDeleteLostSegmentLinksOneNode() throws Exception { /** */ @Test public void testDeleteLostSegmentLinksMultipleGaps() throws Exception { - checkDeleteLostSegmentLinks(F.asList(0L, 2L, 4L), F.asList(4L), true); + checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true); } /** */ @Test - public void testDeleteLostSegmentLinksMultipleSegmentGaps() throws Exception { - Assume.assumeTrue(persistenceEnabled); + public void testDeleteLostSegmentLinksCdcDisabledRecord() throws Exception { + Assume.assumeFalse(persistenceEnabled); - checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true); + checkCdcDisabledRecord(); + } + + /** */ + @Test + public void testDeleteLostSegmentLinksCdcDisabledRecordAfterSkippedSegment() throws Exception { + Assume.assumeFalse(persistenceEnabled); + + archiveSegmentLinks(F.asList(0L, 2L)); + + checkCdcDisabledRecord(); + } + + /** */ + private void checkCdcDisabledRecord() throws Exception { + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + cdcDisabled.propagate(true); + + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + cdcDisabled.propagate(false); + + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + U.sleep(2 * WAL_ARCHIVE_TIMEOUT); + + executeCommand(EXIT_CODE_OK, CDC, DELETE_LOST_SEGMENT_LINKS); + + UserCdcConsumer cnsmr0 = runCdc(srv0); + UserCdcConsumer cnsmr1 = runCdc(srv1); + + waitForSize(cnsmr0, srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + waitForSize(cnsmr1, srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); } /** */ @@ -288,15 +326,16 @@ private void checkLinks(IgniteEx srv, List expLinks) { /** Archive given segments links with possible gaps. */ private void archiveSegmentLinks(List idxs) throws Exception { - for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) { - cdcDisabled.propagate(!idxs.contains(idx)); - - archiveSegment(idx); - } + for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) + archiveSegment(idx, !idxs.contains(idx)); } /** */ - private void archiveSegment(long expIdx) throws Exception { + private void archiveSegment(long expIdx, boolean skipCdcSegment) throws Exception { + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + cdcDisabled.propagate(skipCdcSegment); + CountDownLatch latch = new CountDownLatch(G.allGrids().size()); for (Ignite srv : G.allGrids()) { @@ -308,10 +347,9 @@ private void archiveSegment(long expIdx) throws Exception { }, EVT_WAL_SEGMENT_ARCHIVED); } - addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + latch.await(2 * WAL_ARCHIVE_TIMEOUT, TimeUnit.MILLISECONDS); - if (persistenceEnabled || !(Boolean)cdcDisabled.getOrDefault(false)) - latch.await(getTestTimeout(), TimeUnit.MILLISECONDS); + cdcDisabled.propagate(false); } /** */ diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java index 81cb9864e6c41..271e42de4ae9d 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java @@ -32,6 +32,7 @@ import static org.apache.ignite.testframework.GridTestUtils.stopThreads; import static org.apache.ignite.util.CdcCommandTest.CDC; import static org.apache.ignite.util.CdcCommandTest.RESEND; +import static org.apache.ignite.util.CdcCommandTest.WAL_ARCHIVE_TIMEOUT; import static org.apache.ignite.util.CdcCommandTest.runCdc; import static org.apache.ignite.util.CdcCommandTest.waitForSize; import static org.apache.ignite.util.GridCommandHandlerClusterByClassTest.CACHES; @@ -45,7 +46,7 @@ public class CdcResendCommandTest extends GridCommandHandlerAbstractTest { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() - .setWalForceArchiveTimeout(1000) + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setCdcEnabled(true) .setPersistenceEnabled(true))); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java index cb4edf876dcf1..4df07a38938e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java @@ -21,6 +21,7 @@ import java.util.function.Consumer; import org.apache.ignite.internal.client.GridClientNode; import org.apache.ignite.internal.management.api.ComputeCommand; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask; import org.apache.ignite.lang.IgniteExperimental; @@ -29,13 +30,13 @@ import static org.apache.ignite.internal.management.api.CommandUtils.servers; /** - * Command to delete lost segment links. + * Command to delete lost segment links. For in-memory CDC, also resets the state to the last {@link CdcDisabledRecord}. */ @IgniteExperimental public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { /** {@inheritDoc} */ @Override public String description() { - return "Delete lost segment CDC links"; + return "Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record."; } /** {@inheritDoc} */ @@ -64,9 +65,9 @@ public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { @@ -144,7 +145,7 @@ else if (lostDeleted) /** @return {@code True} if lost segments were found and successfully deleted. */ private boolean deleteLostSegments() { - Set delete = new HashSet<>(); + Set delete = new TreeSet<>(); AtomicLong lastSgmnt = new AtomicLong(-1); @@ -197,6 +198,7 @@ private WALPointer findLastCdcDisabledRecord() { IgniteWalIteratorFactory.IteratorParametersBuilder builder = new IgniteWalIteratorFactory.IteratorParametersBuilder() .log(log) + .sharedContext(ignite.context().cache().context()) .filesOrDirs(segment.toFile()) .addFilter((type, ptr) -> type == CDC_DISABLED); @@ -234,6 +236,8 @@ private void setWalState(WALPointer ptr) { CdcConsumerState state = new CdcConsumerState(log, stateDir); try { + Files.createDirectories(stateDir); + state.saveWal(new T2<>(ptr, 0)); } catch (IOException e) { diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 3d7055c8e7105..3f77d30ebc909 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -839,7 +839,9 @@ public void testDisable() throws Exception { waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); // Cdc application must fail due to skipped data. - assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, "Found the CDC disabled record. Some events are missed."); + assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, + persistenceEnabled ? "Found missed segments. Some events are missed." + : "Found the CDC disabled record. Some events are missed."); ign.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); From 98ce845d7e48e5849ab860fb381eb54cc4388a6c Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 29 Jun 2023 15:38:12 +0300 Subject: [PATCH 06/22] wip --- .../apache/ignite/util/CdcCommandTest.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 8df7068126ebc..4abada15511d4 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -326,30 +326,30 @@ private void checkLinks(IgniteEx srv, List expLinks) { /** Archive given segments links with possible gaps. */ private void archiveSegmentLinks(List idxs) throws Exception { - for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) - archiveSegment(idx, !idxs.contains(idx)); - } + for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) { + boolean skipCdcSegment = !idxs.contains(idx); - /** */ - private void archiveSegment(long expIdx, boolean skipCdcSegment) throws Exception { - addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); - cdcDisabled.propagate(skipCdcSegment); + cdcDisabled.propagate(skipCdcSegment); - CountDownLatch latch = new CountDownLatch(G.allGrids().size()); + CountDownLatch latch = new CountDownLatch(G.allGrids().size()); - for (Ignite srv : G.allGrids()) { - srv.events().localListen(evt -> { - if (expIdx == ((WalSegmentArchivedEvent)evt).getAbsWalSegmentIdx()) - latch.countDown(); + for (Ignite srv : G.allGrids()) { + long idx0 = idx; - return true; - }, EVT_WAL_SEGMENT_ARCHIVED); - } + srv.events().localListen(evt -> { + if (idx0 == ((WalSegmentArchivedEvent)evt).getAbsWalSegmentIdx()) + latch.countDown(); - latch.await(2 * WAL_ARCHIVE_TIMEOUT, TimeUnit.MILLISECONDS); + return true; + }, EVT_WAL_SEGMENT_ARCHIVED); + } - cdcDisabled.propagate(false); + latch.await(2 * WAL_ARCHIVE_TIMEOUT, TimeUnit.MILLISECONDS); + + cdcDisabled.propagate(false); + } } /** */ From 3a825858f00b31d9f720e3219c686444302f3268 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 29 Jun 2023 16:28:46 +0300 Subject: [PATCH 07/22] wip --- .../internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java index 22c041dac36ed..0808a3dff6418 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java @@ -22,7 +22,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; From fab756035accde0779c512fd681f9497801f0acf Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 29 Jun 2023 16:33:17 +0300 Subject: [PATCH 08/22] wip --- .../internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java index 0808a3dff6418..a8981fdd423c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cdc/VisorCdcDeleteLostSegmentsTask.java @@ -89,10 +89,10 @@ private static class VisorCdcDeleteLostSegmentsJob extends VisorJob Date: Thu, 29 Jun 2023 18:08:02 +0300 Subject: [PATCH 09/22] Fix tests --- .../management/cdc/CdcDeleteLostSegmentLinksCommand.java | 2 +- .../GridCommandHandlerClusterByClassTest_help.output | 2 +- .../GridCommandHandlerClusterByClassWithSSLTest_help.output | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java index 4df07a38938e5..502185ca24622 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java @@ -36,7 +36,7 @@ public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { /** {@inheritDoc} */ @Override public String description() { - return "Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record."; + return "Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record"; } /** {@inheritDoc} */ diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index e4debe5b040f5..b99e84c07f214 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -383,7 +383,7 @@ If the file name isn't specified the output file name is: '.bin': control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment CDC links: + Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record: control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index e4debe5b040f5..b99e84c07f214 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -383,7 +383,7 @@ If the file name isn't specified the output file name is: '.bin': control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment CDC links: + Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record: control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: From 74611e93dbcbde143e855d91bfee35deb652f0f1 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Fri, 30 Jun 2023 00:04:00 +0300 Subject: [PATCH 10/22] Fix test --- .../apache/ignite/testframework/wal/record/RecordUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java index 60ffcee8686be..366c3c1f10103 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.function.Supplier; import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord; @@ -113,6 +114,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REMOVE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REPLACE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLED; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CONSISTENT_CUT; @@ -193,6 +195,7 @@ public class RecordUtils { put(DATA_RECORD, RecordUtils::buildDataRecord); put(DATA_RECORD_V2, RecordUtils::buildDataRecord); put(CDC_DATA_RECORD, RecordUtils::buildDataRecord); + put(CDC_DISABLED, CdcDisabledRecord::new); put(CHECKPOINT_RECORD, RecordUtils::buildCheckpointRecord); put(HEADER_RECORD, buildUpsupportedWalRecord(HEADER_RECORD)); put(INIT_NEW_PAGE_RECORD, RecordUtils::buildInitNewPageRecord); From f862aa28fdd6364c47fcea49220c2811bec38520 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 4 Jul 2023 13:40:06 +0300 Subject: [PATCH 11/22] fixed compilation --- .../internal/management/cdc/CdcDeleteLostSegmentsTask.java | 1 - .../core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java index 720d0571cbf65..cb637121502f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java @@ -35,7 +35,6 @@ import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.cdc.CdcConsumerState; import org.apache.ignite.internal.cdc.CdcFileLockHolder; -import org.apache.ignite.internal.management.cdc.CdcDeleteLostSegmentLinksCommandArg; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index ebb3f1d19ca03..278044ca723b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -843,7 +843,7 @@ public void testDisable() throws Exception { persistenceEnabled ? "Found missed segments. Some events are missed." : "Found the CDC disabled record. Some events are missed."); - ign.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); + ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); cnsmr.data.clear(); From 2cc9e378944838f3030adfcd1d5c1cd73d4cb5d4 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 4 Jul 2023 15:32:15 +0300 Subject: [PATCH 12/22] review fixes --- docs/_docs/persistence/change-data-capture.adoc | 4 ++-- .../management/cdc/CdcDeleteLostSegmentLinksCommand.java | 9 +++++---- .../management/cdc/CdcDeleteLostSegmentsTask.java | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index f7c93ce1c1734..91d644d35a41d 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -144,7 +144,7 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c == Handling skipped segments -The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped. +The CDC can be disabled manually or by configured directory maximum size threshold. In this case a hard link creation will be skipped. WARNING: All changes in skipped segments will be lost! @@ -165,7 +165,7 @@ control.sh|bat --cdc delete_lost_segment_links control.sh|bat --cdc delete_lost_segment_links --node-id node_id ---- -The command will remove all segment links before the last gap. Also for in-memory CDC, the state will be reset to the last CDC disabled record. +The command will remove all segment links before the last gap. For in-memory mode state will be reset to the first record written after CDC enable again. For example, CDC was turned off several times: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, `0000000000000011.wal` Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java index a54a67152caf5..64e9262344fce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java @@ -21,7 +21,6 @@ import java.util.function.Consumer; import org.apache.ignite.internal.client.GridClientNode; import org.apache.ignite.internal.management.api.ComputeCommand; -import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteExperimental; @@ -29,7 +28,8 @@ import static org.apache.ignite.internal.management.api.CommandUtils.servers; /** - * Command to delete lost segment links. For in-memory CDC, also resets the state to the last {@link CdcDisabledRecord}. + * Command to delete lost segment links. For in-memory mode state will be reset to the first record written + * after CDC enable again. */ @IgniteExperimental public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { @@ -64,8 +64,9 @@ public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { From f56edd7e8b6071752c1d4b04d30992b2e6a6cb45 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 4 Jul 2023 15:34:28 +0300 Subject: [PATCH 13/22] review fixes --- .../management/cdc/CdcDeleteLostSegmentLinksCommand.java | 3 ++- .../GridCommandHandlerClusterByClassTest_help.output | 2 +- .../GridCommandHandlerClusterByClassWithSSLTest_help.output | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java index 64e9262344fce..2655e7cfea16b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java @@ -35,7 +35,8 @@ public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { /** {@inheritDoc} */ @Override public String description() { - return "Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record"; + return "Delete lost segment CDC links. For in-memory mode state will be reset to the first record written " + + "after CDC enable again"; } /** {@inheritDoc} */ diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index b99e84c07f214..1079fc6f81293 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -383,7 +383,7 @@ If the file name isn't specified the output file name is: '.bin': control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record: + Delete lost segment CDC links. For in-memory mode state will be reset to the first record written after CDC enable again: control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index b99e84c07f214..1079fc6f81293 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -383,7 +383,7 @@ If the file name isn't specified the output file name is: '.bin': control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment CDC links. For in-memory CDC, also reset the state to the last CDC disabled record: + Delete lost segment CDC links. For in-memory mode state will be reset to the first record written after CDC enable again: control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: From e78e36486f21ceac147c64d6ecfac7cfc8c7fce9 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 4 Jul 2023 15:36:27 +0300 Subject: [PATCH 14/22] review fixes --- docs/_docs/persistence/change-data-capture.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index 91d644d35a41d..d38d863f7489c 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -182,7 +182,7 @@ The CDC application will fail if meets this record while processing segments. Ex It means that some data changes are skipped and administrator attention is required. -You can reset the CDC application state to the last CDC disabled record and fix possible gaps +You can reset the CDC application state to the first record written after CDC enable again and fix possible gaps using the `delete_lost_segment_links` link:#handling-skipped-segments[command]. == Forcefully resend all cache data to CDC From b5f7b14cc5deaac81285699d6697f94fabe45afe Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Wed, 5 Jul 2023 16:44:18 +0300 Subject: [PATCH 15/22] WIP --- .../persistence/change-data-capture.adoc | 4 +-- .../apache/ignite/internal/cdc/CdcMain.java | 15 +++++++--- .../internal/cdc/WalRecordsConsumer.java | 9 ++++-- .../cdc/CdcDeleteLostSegmentsTask.java | 10 +++---- ...abledRecord.java => CdcDisableRecord.java} | 4 +-- .../pagemem/wal/record/WALRecord.java | 4 +-- .../wal/FileWriteAheadLogManager.java | 28 +++++++++---------- .../serializer/RecordDataV1Serializer.java | 10 +++---- .../testframework/wal/record/RecordUtils.java | 6 ++-- 9 files changed, 50 insertions(+), 40 deletions(-) rename modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/{CdcDisabledRecord.java => CdcDisableRecord.java} (91%) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index d38d863f7489c..8f4ed77b7bb70 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -173,12 +173,12 @@ The application will start from the `0000000000000010.wal` segment after being e == Handling disabled in-memory CDC -When in-memory CDC disabled, the special WAL record (CDC disabled record) will be written on data changes. +When in-memory CDC disabled, the special WAL record (CDC disable record) will be written on data changes. WARNING: All changes are skipped when in-memory CDC is disabled. The CDC application will fail if meets this record while processing segments. Example of error: -`Found the CDC disabled record. Some events are missed. Exiting! [state=WALPointer [idx=1, fileOff=29, len=21]]` +`Found the CDC disable record. Some events are missed. Exiting! [state=WALPointer [idx=1, fileOff=29, len=21]]` It means that some data changes are skipped and administrator attention is required. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 9e45f59f109ba..92070d4f1c2f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -50,6 +50,8 @@ import org.apache.ignite.internal.MarshallerContextImpl; import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator; +import org.apache.ignite.internal.management.cdc.CdcCommand; +import org.apache.ignite.internal.management.cdc.CdcDeleteLostSegmentLinksCommand; import org.apache.ignite.internal.processors.cache.GridLocalConfigManager; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver; @@ -78,8 +80,9 @@ import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer; import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX; +import static org.apache.ignite.internal.management.api.CommandUtils.toFormattedCommandName; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; -import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLED; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex; @@ -450,8 +453,12 @@ public void consumeWalSegmentsUntilStopped() { long nextSgmnt = segmentIndex(p); if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) { - throw new IgniteException("Found missed segments. Some events are missed. Exiting! " + - "[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']'); + throw new IgniteException("Found missed segments. Please, check node log. " + + "To continue CDC, please, use 'control.sh(bat) " + + toFormattedCommandName(CdcCommand.class) + ' ' + + toFormattedCommandName(CdcDeleteLostSegmentLinksCommand.class) + + "' command. Exiting! [lastSegment=" + lastSgmnt.get() + + ", nextSegment=" + nextSgmnt + ']'); } lastSgmnt.set(nextSgmnt); @@ -487,7 +494,7 @@ private void consumeSegment(Path segment) { .marshallerMappingFileStoreDir(marshaller) .keepBinary(cdcCfg.isKeepBinary()) .filesOrDirs(segment.toFile()) - .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD || type == CDC_DISABLED); + .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD || type == CDC_DISABLE); if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0) builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index b52af66e4cfb5..5917d464b6b0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -28,6 +28,8 @@ import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.cdc.TypeMapping; +import org.apache.ignite.internal.management.cdc.CdcCommand; +import org.apache.ignite.internal.management.cdc.CdcDeleteLostSegmentLinksCommand; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; @@ -44,6 +46,7 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; +import static org.apache.ignite.internal.management.api.CommandUtils.toFormattedCommandName; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -303,8 +306,10 @@ private void advance() { curRec = walIter.next(); - if (curRec.get2().type() == WALRecord.RecordType.CDC_DISABLED) { - throw new IgniteException("Found the CDC disabled record. Some events are missed. Exiting! " + + if (curRec.get2().type() == WALRecord.RecordType.CDC_DISABLE) { + throw new IgniteException("CDC disabled on node. Please, check node log. To continue CDC, please, " + + "use 'control.sh(bat) " + toFormattedCommandName(CdcCommand.class) + ' ' + + toFormattedCommandName(CdcDeleteLostSegmentLinksCommand.class) + "' command. Exiting! " + "[state=" + curRec.get1() + ']'); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java index 4f52e1889a9f8..e98319cb99e7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java @@ -36,7 +36,7 @@ import org.apache.ignite.internal.cdc.CdcConsumerState; import org.apache.ignite.internal.cdc.CdcFileLockHolder; import org.apache.ignite.internal.pagemem.wal.WALIterator; -import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisableRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; @@ -50,7 +50,7 @@ import static org.apache.ignite.internal.cdc.CdcConsumerState.WAL_STATE_FILE_NAME; import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR; -import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLED; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLE; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER; /** @@ -180,7 +180,7 @@ private boolean deleteLostSegments() { return true; } - /** @return WAL pointer to the last {@link CdcDisabledRecord}. */ + /** @return WAL pointer to the last {@link CdcDisableRecord}. */ private WALPointer findLastCdcDisabledRecord() { if (!wal.inMemoryCdc()) return null; @@ -199,7 +199,7 @@ private WALPointer findLastCdcDisabledRecord() { .log(log) .sharedContext(ignite.context().cache().context()) .filesOrDirs(segment.toFile()) - .addFilter((type, ptr) -> type == CDC_DISABLED); + .addFilter((type, ptr) -> type == CDC_DISABLE); if (ignite.configuration().getDataStorageConfiguration().getPageSize() != 0) builder.pageSize(ignite.configuration().getDataStorageConfiguration().getPageSize()); @@ -214,7 +214,7 @@ private WALPointer findLastCdcDisabledRecord() { }); if (log.isInfoEnabled() && lastRec.get() != null) - log.info("Found CDC disabled record [ptr=" + lastRec.get() + ']'); + log.info("Found CDC disable record [ptr=" + lastRec.get() + ']'); return lastRec.get(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisabledRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisableRecord.java similarity index 91% rename from modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisabledRecord.java rename to modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisableRecord.java index 34c9ef1ad0d01..33a6e4fe12f10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisabledRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcDisableRecord.java @@ -20,9 +20,9 @@ /** * The record to notify the CDC application about skipped data records while the CDC was disabled. */ -public class CdcDisabledRecord extends WALRecord { +public class CdcDisableRecord extends WALRecord { /** {@inheritDoc} */ @Override public RecordType type() { - return RecordType.CDC_DISABLED; + return RecordType.CDC_DISABLE; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 6836c80c421d3..b95cfa1e441d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -293,8 +293,8 @@ public enum RecordType { /** CDC data record. */ CDC_DATA_RECORD(78, CUSTOM), - /** CDC disabled record. */ - CDC_DISABLED(79, CUSTOM); + /** CDC disable record. */ + CDC_DISABLE(79, CUSTOM); /** Index for serialization. Should be consistent throughout all versions. */ private final int idx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index ede5e3003c374..3ab1110db44b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -47,7 +47,6 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -73,7 +72,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; -import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisableRecord; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; @@ -423,9 +422,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** CDC disabled flag. */ private final DistributedBooleanProperty cdcDisabled = detachedBooleanProperty(CDC_DISABLED); - /** */ - private final AtomicBoolean cdcDisabledRecLogged = new AtomicBoolean(); - /** * Constructor. * @@ -521,14 +517,19 @@ public void setFileIOFactory(FileIOFactory ioFactory) { name, oldVal, newVal)); } - if (newVal != null && newVal) + if (newVal != null && newVal) { log.warning("CDC was disabled."); - if (oldVal != null && oldVal && !newVal) { - if (log.isInfoEnabled()) - log.info("CDC was enabled."); + if (inMemoryCdc) { + try { + log(new CdcDisableRecord()); + } + catch (IgniteCheckedException e) { + U.error(log, "Unable to log CDC disable record: " + e.getMessage(), e); - cdcDisabledRecLogged.set(false); + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); + } + } } }); @@ -1835,16 +1836,13 @@ public long maxWalSegmentSize() { } /** @return {@code True} if log record should be skipped when CDC is disabled. */ - private boolean skipIfCdcDisabled(WALRecord rec) throws IgniteCheckedException { - if (!inMemoryCdc || rec instanceof CdcDisabledRecord || !cdcDisabled.getOrDefault(false)) + private boolean skipIfCdcDisabled(WALRecord rec) { + if (!inMemoryCdc || rec instanceof CdcDisableRecord || !cdcDisabled.getOrDefault(false)) return false; LT.warn(log, "Logging CDC data records to WAL skipped. The '" + CDC_DISABLED + "' distributed property is 'true'."); - if (cdcDisabledRecLogged.compareAndSet(false, true)) - log(new CdcDisabledRecord()); - return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index eabbdec823520..f8cb6d58a21b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -37,7 +37,7 @@ import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.record.CacheState; -import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisableRecord; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; @@ -542,7 +542,7 @@ assert record instanceof PageSnapshot; return 4 + 8 + 1; case SWITCH_SEGMENT_RECORD: - case CDC_DISABLED: + case CDC_DISABLE: return 0; case TX_RECORD: @@ -1325,8 +1325,8 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, break; - case CDC_DISABLED: - res = new CdcDisabledRecord(); + case CDC_DISABLE: + res = new CdcDisableRecord(); break; @@ -1919,7 +1919,7 @@ void writePlainRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedExcepti break; case SWITCH_SEGMENT_RECORD: - case CDC_DISABLED: + case CDC_DISABLE: break; case MASTER_KEY_CHANGE_RECORD_V2: diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java index 366c3c1f10103..d81588d3a14d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java @@ -24,7 +24,7 @@ import java.util.UUID; import java.util.function.Supplier; import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.pagemem.wal.record.CdcDisabledRecord; +import org.apache.ignite.internal.pagemem.wal.record.CdcDisableRecord; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord; @@ -114,7 +114,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REMOVE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REPLACE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; -import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLED; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CONSISTENT_CUT; @@ -195,7 +195,7 @@ public class RecordUtils { put(DATA_RECORD, RecordUtils::buildDataRecord); put(DATA_RECORD_V2, RecordUtils::buildDataRecord); put(CDC_DATA_RECORD, RecordUtils::buildDataRecord); - put(CDC_DISABLED, CdcDisabledRecord::new); + put(CDC_DISABLE, CdcDisableRecord::new); put(CHECKPOINT_RECORD, RecordUtils::buildCheckpointRecord); put(HEADER_RECORD, buildUpsupportedWalRecord(HEADER_RECORD)); put(INIT_NEW_PAGE_RECORD, RecordUtils::buildInitNewPageRecord); From fa43eaec085cb093b8a5e205f0a09e1b2cde9b7a Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Fri, 7 Jul 2023 01:14:45 +0300 Subject: [PATCH 16/22] Log CDC disable and rollover --- .../persistence/change-data-capture.adoc | 39 +++--- .../apache/ignite/util/CdcCommandTest.java | 71 +++++------ .../apache/ignite/internal/cdc/CdcMain.java | 12 +- .../internal/cdc/WalRecordsConsumer.java | 15 +-- .../cdc/CdcDeleteLostSegmentLinksCommand.java | 9 +- .../cdc/CdcDeleteLostSegmentsTask.java | 119 ++++++++---------- .../wal/FileWriteAheadLogManager.java | 42 +++++-- .../org/apache/ignite/cdc/CdcSelfTest.java | 39 +++--- ...mmandHandlerClusterByClassTest_help.output | 2 +- ...ndlerClusterByClassWithSSLTest_help.output | 2 +- 10 files changed, 166 insertions(+), 184 deletions(-) diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc index 8f4ed77b7bb70..019bd5c4a0c88 100644 --- a/docs/_docs/persistence/change-data-capture.adoc +++ b/docs/_docs/persistence/change-data-capture.adoc @@ -142,19 +142,30 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c 5. Infinitely wait for the newly available segment and process it. 6. Stop the consumer in case of a failure or a received stop signal. -== Handling skipped segments +== Disabling CDC -The CDC can be disabled manually or by configured directory maximum size threshold. In this case a hard link creation will be skipped. +The CDC can be disabled manually or by configured directory maximum size threshold. -WARNING: All changes in skipped segments will be lost! +WARNING: All changes while CDC is disabled will be lost! -So when enabled there will be gap between segments: `0000000000000002.wal`, `0000000000000010.wal`, `0000000000000011.wal`, for example. -In this case `ignite-cdc.sh` will fail with the something like "Found missed segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]". +When CDC disabled manually, the special WAL record (CDC disable record) will be written to the end of +current segment with rollover afterwards. Hard link creation will be skipped for next segments until CDC is enabled. +For in-memory CDC no changes will be flushed to the WAL. + +The CDC application will fail if meets this record while processing segments. Example of error: +`CDC disabled on node. Please, check node log. Exiting!` + +When configured directory maximum size is exceeded, hard link creation will be skipped for this node. +Once space in the directory is available again, hard link creation will be continued. +So, there will be gap between segments: `0000000000000002.wal`, `0000000000000010.wal`, `0000000000000011.wal`, for example. +In this case `ignite-cdc.sh` will fail with something like "Found missed segments. Please, check node log. Exiting! [lastSegment=2, nextSegment=10]". + +These errors mean that data changes are lost and administrator attention is required. NOTE: Make sure you need to sync data before restarting the CDC application. You can synchronize caches using link:#forcefully-resend-all-cache-data-to-cdc[resend command], snapshot or other methods. -To fix this error you can run the following link:tools/control-script[Control Script] command: +To fix these errors you can run the following link:tools/control-script[Control Script] command: [source,shell] ---- @@ -165,26 +176,12 @@ control.sh|bat --cdc delete_lost_segment_links control.sh|bat --cdc delete_lost_segment_links --node-id node_id ---- -The command will remove all segment links before the last gap. For in-memory mode state will be reset to the first record written after CDC enable again. +The command will remove all segment links before the CDC is disabled or the last gap. For example, CDC was turned off several times: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, `0000000000000011.wal` Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`. The application will start from the `0000000000000010.wal` segment after being enabled. -== Handling disabled in-memory CDC - -When in-memory CDC disabled, the special WAL record (CDC disable record) will be written on data changes. - -WARNING: All changes are skipped when in-memory CDC is disabled. - -The CDC application will fail if meets this record while processing segments. Example of error: -`Found the CDC disable record. Some events are missed. Exiting! [state=WALPointer [idx=1, fileOff=29, len=21]]` - -It means that some data changes are skipped and administrator attention is required. - -You can reset the CDC application state to the first record written after CDC enable again and fix possible gaps -using the `delete_lost_segment_links` link:#handling-skipped-segments[command]. - == Forcefully resend all cache data to CDC When the CDC has been forcefully disabled for a while, cache changes will be skipped. diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 4abada15511d4..f0d42db9cd82a 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -59,7 +59,6 @@ import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; -import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -78,6 +77,7 @@ import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.ignite.util.GridCommandHandlerClusterByClassTest.CACHES; import static org.apache.ignite.util.SystemViewCommandTest.NODE_ID; +import static org.junit.Assume.assumeTrue; /** * CDC command tests. @@ -241,41 +241,36 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { /** */ @Test public void testDeleteLostSegmentLinks() throws Exception { - checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), true); + assumeTrue(persistenceEnabled); + + archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), true); } /** */ @Test public void testDeleteLostSegmentLinksOneNode() throws Exception { - checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), false); - } + assumeTrue(persistenceEnabled); - /** */ - @Test - public void testDeleteLostSegmentLinksMultipleGaps() throws Exception { - checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true); + archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), false); } /** */ @Test - public void testDeleteLostSegmentLinksCdcDisabledRecord() throws Exception { - Assume.assumeFalse(persistenceEnabled); + public void testDeleteLostSegmentLinksMultipleGaps() throws Exception { + assumeTrue(persistenceEnabled); - checkCdcDisabledRecord(); + archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true); } /** */ @Test - public void testDeleteLostSegmentLinksCdcDisabledRecordAfterSkippedSegment() throws Exception { - Assume.assumeFalse(persistenceEnabled); - - archiveSegmentLinks(F.asList(0L, 2L)); - - checkCdcDisabledRecord(); + public void testDeleteLostSegmentLinksNoGaps() throws Exception { + archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 1L), F.asList(0L, 1L), true); } /** */ - private void checkCdcDisabledRecord() throws Exception { + @Test + public void testDeleteLostSegmentLinksCdcDisable() throws Exception { addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); cdcDisabled.propagate(true); @@ -288,19 +283,22 @@ private void checkCdcDisabledRecord() throws Exception { U.sleep(2 * WAL_ARCHIVE_TIMEOUT); - executeCommand(EXIT_CODE_OK, CDC, DELETE_LOST_SEGMENT_LINKS); - - UserCdcConsumer cnsmr0 = runCdc(srv0); - UserCdcConsumer cnsmr1 = runCdc(srv1); - - waitForSize(cnsmr0, srv0.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); - waitForSize(cnsmr1, srv1.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY)); + checkDeleteLostSegmentLinks(F.asList(0L, 1L), F.asList(1L), true); } /** */ - private void checkDeleteLostSegmentLinks(List expBefore, List expAfter, boolean allNodes) throws Exception { + private void archiveAndCheckDeleteLostSegmentLinks( + List expBefore, + List expAfter, + boolean allNodes + ) throws Exception { archiveSegmentLinks(expBefore); + checkDeleteLostSegmentLinks(expBefore, expAfter, allNodes); + } + + /** */ + private void checkDeleteLostSegmentLinks(List expBefore, List expAfter, boolean allNodes) { checkLinks(srv0, expBefore); checkLinks(srv1, expBefore); @@ -320,19 +318,13 @@ private void checkLinks(IgniteEx srv, List expLinks) { File[] links = wal0.walCdcDirectory().listFiles(WAL_SEGMENT_FILE_FILTER); assertEquals(expLinks.size(), links.length); - Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex) - .allMatch(expLinks::contains); + assertTrue(Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex) + .allMatch(expLinks::contains)); } /** Archive given segments links with possible gaps. */ private void archiveSegmentLinks(List idxs) throws Exception { for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) { - boolean skipCdcSegment = !idxs.contains(idx); - - addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); - - cdcDisabled.propagate(skipCdcSegment); - CountDownLatch latch = new CountDownLatch(G.allGrids().size()); for (Ignite srv : G.allGrids()) { @@ -346,9 +338,14 @@ private void archiveSegmentLinks(List idxs) throws Exception { }, EVT_WAL_SEGMENT_ARCHIVED); } - latch.await(2 * WAL_ARCHIVE_TIMEOUT, TimeUnit.MILLISECONDS); + addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT); + + boolean skipNext = !idxs.contains(idx + 1) && idx < idxs.get(idxs.size() - 1); + + if (!skipNext || !idxs.contains(idx)) + latch.await(2 * WAL_ARCHIVE_TIMEOUT, TimeUnit.MILLISECONDS); - cdcDisabled.propagate(false); + cdcDisabled.propagate(skipNext); } } @@ -439,7 +436,7 @@ public void testResendCancelOnNodeLeft() { /** */ @Test public void testResendCancelOnRebalanceInProgress() throws Exception { - Assume.assumeTrue(commandHandler.equals(CLI_CMD_HND)); + assumeTrue(commandHandler.equals(CLI_CMD_HND)); injectTestSystemOut(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 92070d4f1c2f7..6ec73c18b0dd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -50,8 +50,6 @@ import org.apache.ignite.internal.MarshallerContextImpl; import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator; -import org.apache.ignite.internal.management.cdc.CdcCommand; -import org.apache.ignite.internal.management.cdc.CdcDeleteLostSegmentLinksCommand; import org.apache.ignite.internal.processors.cache.GridLocalConfigManager; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver; @@ -80,7 +78,6 @@ import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer; import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX; -import static org.apache.ignite.internal.management.api.CommandUtils.toFormattedCommandName; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLE; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; @@ -453,12 +450,9 @@ public void consumeWalSegmentsUntilStopped() { long nextSgmnt = segmentIndex(p); if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) { - throw new IgniteException("Found missed segments. Please, check node log. " + - "To continue CDC, please, use 'control.sh(bat) " + - toFormattedCommandName(CdcCommand.class) + ' ' + - toFormattedCommandName(CdcDeleteLostSegmentLinksCommand.class) + - "' command. Exiting! [lastSegment=" + lastSgmnt.get() + - ", nextSegment=" + nextSgmnt + ']'); + throw new IgniteException("Found missed segments. Please, check node log. Exiting! " + + "To continue CDC, please, use the command: control.sh|bat --cdc delete_lost_segment_links" + + " [lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']'); } lastSgmnt.set(nextSgmnt); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 5917d464b6b0b..eb02321e9ef8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -28,8 +28,6 @@ import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.cdc.TypeMapping; -import org.apache.ignite.internal.management.cdc.CdcCommand; -import org.apache.ignite.internal.management.cdc.CdcDeleteLostSegmentLinksCommand; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; @@ -46,7 +44,6 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; -import static org.apache.ignite.internal.management.api.CommandUtils.toFormattedCommandName; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -243,8 +240,6 @@ public static class DataEntryIterator implements Iterator, AutoClosea /** @param walIter WAL iterator. */ DataEntryIterator(WALIterator walIter) { this.walIter = walIter; - - advance(); } /** @return Current state. */ @@ -268,6 +263,9 @@ void init(int idx) { /** {@inheritDoc} */ @Override public boolean hasNext() { + if (next == null) + advance(); + return next != null; } @@ -280,8 +278,6 @@ void init(int idx) { next = null; - advance(); - return e; } @@ -307,9 +303,8 @@ private void advance() { curRec = walIter.next(); if (curRec.get2().type() == WALRecord.RecordType.CDC_DISABLE) { - throw new IgniteException("CDC disabled on node. Please, check node log. To continue CDC, please, " + - "use 'control.sh(bat) " + toFormattedCommandName(CdcCommand.class) + ' ' + - toFormattedCommandName(CdcDeleteLostSegmentLinksCommand.class) + "' command. Exiting! " + + throw new IgniteException("CDC disabled on node. Please, check node log. Exiting! " + + "To continue CDC, please, use the command: control.sh|bat --cdc delete_lost_segment_links " + "[state=" + curRec.get1() + ']'); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java index 2655e7cfea16b..aeddc00d96332 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentLinksCommand.java @@ -28,15 +28,13 @@ import static org.apache.ignite.internal.management.api.CommandUtils.servers; /** - * Command to delete lost segment links. For in-memory mode state will be reset to the first record written - * after CDC enable again. + * Command to delete lost segment CDC links before the CDC is disabled or the last gap. */ @IgniteExperimental public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { /** {@inheritDoc} */ @Override public String description() { - return "Delete lost segment CDC links. For in-memory mode state will be reset to the first record written " + - "after CDC enable again"; + return "Delete lost segment CDC links before the CDC is disabled or the last gap"; } /** {@inheritDoc} */ @@ -65,8 +63,7 @@ public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand { @@ -121,14 +115,22 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool try { lock.tryLock(1); - boolean lostDeleted = deleteLostSegments(); + Long lostSgmnt = findLastLostSegement(); - WALPointer lastCdcDisabledRec = findLastCdcDisabledRecord(); + if (lostSgmnt != null) + deleteAll(lostSgmnt); - if (lastCdcDisabledRec != null) - setWalState(lastCdcDisabledRec.next()); // Reset WAL state to the next record. - else if (lostDeleted) - setWalState(null); // Delete WAL state. + Long cdcDisableSgmnt = findLastCdcDisableSegment(); + + if (cdcDisableSgmnt != null) + deleteAll(cdcDisableSgmnt); + + if (lostSgmnt != null || cdcDisableSgmnt != null) + resetWalState(); + else { + if (log.isInfoEnabled()) + log.info("Lost segment CDC links or CDC disable record were not found."); + } } catch (IgniteCheckedException e) { throw new IgniteException("Failed to delete lost segment CDC links. " + @@ -142,13 +144,15 @@ else if (lostDeleted) return null; } - /** @return {@code True} if lost segments were found and successfully deleted. */ - private boolean deleteLostSegments() { - Set delete = new TreeSet<>(); - + /** @return The index of the segment previous to the last gap or {@code null} if no gaps were found. */ + private Long findLastLostSegement() { + AtomicReference lastLostSgmnt = new AtomicReference<>(); AtomicLong lastSgmnt = new AtomicLong(-1); consumeCdcSegments(segment -> { + if (lastLostSgmnt.get() != null) + return; + long idx = FileWriteAheadLogManager.segmentIndex(segment); if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) { @@ -157,35 +161,18 @@ private boolean deleteLostSegments() { return; } - delete.add(segment.toFile()); - }); - - if (delete.isEmpty()) { - log.info("Lost segment CDC links were not found."); - - return false; - } - - log.info("Found lost segment CDC links. The following links will be deleted: " + delete); - - delete.forEach(file -> { - if (!file.delete()) { - throw new IgniteException("Failed to delete lost segment CDC link [file=" + - file.getAbsolutePath() + ']'); - } + if (log.isInfoEnabled()) + log.info("Found lost segment CDC links [lastLostSgmntIdx=" + idx + ']'); - log.info("Segment CDC link deleted [file=" + file.getAbsolutePath() + ']'); + lastLostSgmnt.set(idx); }); - return true; + return lastLostSgmnt.get(); } - /** @return WAL pointer to the last {@link CdcDisableRecord}. */ - private WALPointer findLastCdcDisabledRecord() { - if (!wal.inMemoryCdc()) - return null; - - AtomicReference lastRec = new AtomicReference<>(); + /** @return The index of the segment that contains the last {@link CdcDisableRecord}. */ + private Long findLastCdcDisableSegment() { + AtomicReference lastRec = new AtomicReference<>(); consumeCdcSegments(segment -> { if (lastRec.get() != null) @@ -205,52 +192,56 @@ private WALPointer findLastCdcDisabledRecord() { builder.pageSize(ignite.configuration().getDataStorageConfiguration().getPageSize()); try (WALIterator it = new IgniteWalIteratorFactory(log).iterator(builder)) { - while (it.hasNext()) - lastRec.set(it.next().getKey()); + if (it.hasNext()) { + if (log.isInfoEnabled()) + log.info("Found CDC disable record [ptr=" + it.next().get1() + ']'); + + lastRec.set(FileWriteAheadLogManager.segmentIndex(segment)); + } } catch (IgniteCheckedException e) { throw new IgniteException("Failed to read CDC segment [path=" + segment + ']', e); } }); - if (log.isInfoEnabled() && lastRec.get() != null) - log.info("Found CDC disable record [ptr=" + lastRec.get() + ']'); - return lastRec.get(); } - /** @param ptr WAL pointer to set state or {@code null} to delete state. */ - private void setWalState(WALPointer ptr) { + /** */ + private void resetWalState() { Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); - if (ptr == null) { - File state = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); + File state = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); - if (state.exists() && !state.delete()) - throw new IgniteException("Failed to delete wal state file [file=" + state.getAbsolutePath() + ']'); + if (state.exists() && !state.delete()) + throw new IgniteException("Failed to delete wal state file [file=" + state.getAbsolutePath() + ']'); - return; - } + if (log.isInfoEnabled()) + log.info("The CDC application WAL state has been reset."); + } - CdcConsumerState state = new CdcConsumerState(log, stateDir); + /** Delete all segments with an absolute index less than or equal to the given one. */ + private void deleteAll(long lastIdx) { + consumeCdcSegments(segment -> { + if (FileWriteAheadLogManager.segmentIndex(segment) > lastIdx) + return; - try { - Files.createDirectories(stateDir); + if (!segment.toFile().delete()) { + throw new IgniteException("Failed to delete lost segment CDC link [segment=" + + segment.toAbsolutePath() + ']'); + } - state.saveWal(new T2<>(ptr, 0)); - } - catch (IOException e) { - throw new IgniteException("Failed to set WAL state file.", e); - } + log.info("Segment CDC link deleted [file=" + segment.toAbsolutePath() + ']'); + }); } - /** Consume CDC segments in reversed order. */ + /** Consume CDC segments in descending order. */ private void consumeCdcSegments(Consumer cnsmr) { try (Stream cdcFiles = Files.list(walCdcDir.toPath())) { cdcFiles .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile())) .sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex) - .reversed()) // Sort by segment index. + .reversed()) // Sort by segment index in descending order. .forEach(cnsmr); } catch (IOException e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 3ab1110db44b1..b59ead612bf4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -55,6 +55,7 @@ import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -422,6 +423,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** CDC disabled flag. */ private final DistributedBooleanProperty cdcDisabled = detachedBooleanProperty(CDC_DISABLED); + /** The segment index to which the CDC link should be created when CDC is disabled. */ + private volatile long lastCdcDisableSgmnt = Long.MAX_VALUE; + /** * Constructor. * @@ -520,16 +524,14 @@ public void setFileIOFactory(FileIOFactory ioFactory) { if (newVal != null && newVal) { log.warning("CDC was disabled."); - if (inMemoryCdc) { - try { - log(new CdcDisableRecord()); - } - catch (IgniteCheckedException e) { - U.error(log, "Unable to log CDC disable record: " + e.getMessage(), e); + logCdcDisableRecord(); + } - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); - } - } + if (oldVal != null && oldVal && !newVal) { + if (log.isInfoEnabled()) + log.info("CDC was enabled."); + + lastCdcDisableSgmnt = Long.MAX_VALUE; } }); @@ -1846,6 +1848,26 @@ private boolean skipIfCdcDisabled(WALRecord rec) { return true; } + /** */ + private void logCdcDisableRecord() { + cctx.database().checkpointReadLock(); + + try { + WALPointer ptr = log(new CdcDisableRecord(), RolloverType.CURRENT_SEGMENT); + + if (ptr == null) + throw new IgniteException("CDC disable record was not logged."); + + lastCdcDisableSgmnt = ptr.index(); + } + catch (Exception e) { + U.error(log, "Unable to log CDC disable record: " + e.getMessage(), e); + } + finally { + cctx.database().checkpointReadUnlock(); + } + } + /** * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate the * work WAL segment: S(N) = N % dsCfg.walSegments. When a work segment is finished, it is given to the archiver. If @@ -2178,7 +2200,7 @@ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException Files.move(dstTmpFile.toPath(), dstFile.toPath()); if (walCdcDir != null) { - if (!cdcDisabled.getOrDefault(false)) { + if (absIdx <= lastCdcDisableSgmnt) { if (checkCdcWalDirectorySize(dstFile.length())) Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath()); else { diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 278044ca723b2..e8a642de8e483 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -800,37 +800,29 @@ public void testDisable() throws Exception { ign.cluster().state(ACTIVE); - IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); - - addData(cache, 0, 1); - FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ign.context().cache().context().wal(true); - File walCdcDir = wal.walCdcDirectory(); - File archiveDir = wal.archiveDir(); - - assertTrue(waitForCondition(() -> 1 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); - assertEquals(1, archiveDir.listFiles().length); - - DistributedChangeableProperty disabled = ign.context().distributedConfiguration() + DistributedChangeableProperty cdcDisabled = ign.context().distributedConfiguration() .property(FileWriteAheadLogManager.CDC_DISABLED); - disabled.propagate(true); + IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); addData(cache, 0, 1); + cdcDisabled.propagate(true); + + long lastIdx = wal.lastWritePointer().index(); + + addData(cache, 0, 2); + Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT); - assertEquals(1, walCdcDir.list().length); // In-memory CDC should not produce WAL records when disabled. - assertEquals(persistenceEnabled ? 2 : 1, archiveDir.list().length); + assertEquals(persistenceEnabled ? lastIdx + 1 : lastIdx, wal.lastWritePointer().index()); - disabled.propagate(false); + cdcDisabled.propagate(false); - addData(cache, 0, 1); - - assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT)); - assertEquals(persistenceEnabled ? 3 : 2, archiveDir.listFiles().length); + addData(cache, 0, 3); UserCdcConsumer cnsmr = new UserCdcConsumer(); @@ -839,16 +831,14 @@ public void testDisable() throws Exception { waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); // Cdc application must fail due to skipped data. - assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, - persistenceEnabled ? "Found missed segments. Some events are missed." - : "Found the CDC disabled record. Some events are missed."); + assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, "CDC disabled on node."); ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); cnsmr.data.clear(); - waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + waitForSize(3, DEFAULT_CACHE_NAME, UPDATE, cnsmr); cdcFut.cancel(); } @@ -901,8 +891,7 @@ public void testCdcDirectoryMaxSize() throws Exception { // Write next segment after skipped. writeSgmnt.run(); - assertThrows(log, () -> fut.get(getTestTimeout()), IgniteCheckedException.class, - "Found missed segments. Some events are missed."); + assertThrows(log, () -> fut.get(getTestTimeout()), IgniteCheckedException.class, "Found missed segments."); ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output index 1079fc6f81293..d71532d40d57a 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output @@ -383,7 +383,7 @@ If the file name isn't specified the output file name is: '.bin': control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment CDC links. For in-memory mode state will be reset to the first record written after CDC enable again: + Delete lost segment CDC links before the CDC is disabled or the last gap: control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output index 1079fc6f81293..d71532d40d57a 100644 --- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output +++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output @@ -383,7 +383,7 @@ If the file name isn't specified the output file name is: '.bin': control.(sh|bat) --consistency finalize [EXPERIMENTAL] - Delete lost segment CDC links. For in-memory mode state will be reset to the first record written after CDC enable again: + Delete lost segment CDC links before the CDC is disabled or the last gap: control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes] Parameters: From 9c2504ccb45b91046de3326a04f137f464c5b715 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Fri, 7 Jul 2023 09:46:09 +0300 Subject: [PATCH 17/22] minor fixes --- .../cache/persistence/wal/FileWriteAheadLogManager.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index b59ead612bf4f..669c334673dcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -3401,11 +3401,6 @@ private boolean walArchiveUnlimited() { return maxWalArchiveSize == UNLIMITED_WAL_ARCHIVE; } - /** @return {@code True} if WAL enabled only for CDC. */ - public boolean inMemoryCdc() { - return inMemoryCdc; - } - /** * Removing files from {@link #walArchiveDir}. * From fd41a0bb777689f0d29f06cc6d91c5c0e6ca45e1 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Fri, 7 Jul 2023 18:24:56 +0300 Subject: [PATCH 18/22] review fixes --- .../cdc/CdcDeleteLostSegmentsTask.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java index 5619d14633013..53297e7f9fc4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java @@ -115,22 +115,20 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool try { lock.tryLock(1); - Long lostSgmnt = findLastLostSegement(); + Long lastSegBeforeSkip = findLastSegmentBeforeSkip(); - if (lostSgmnt != null) - deleteAll(lostSgmnt); + if (lastSegBeforeSkip != null) + deleteAllUntil(lastSegBeforeSkip); - Long cdcDisableSgmnt = findLastCdcDisableSegment(); + Long cdcDisableSgmnt = findLastSegmentWithCdcDisabledRecord(); if (cdcDisableSgmnt != null) - deleteAll(cdcDisableSgmnt); + deleteAllUntil(cdcDisableSgmnt); - if (lostSgmnt != null || cdcDisableSgmnt != null) - resetWalState(); - else { - if (log.isInfoEnabled()) - log.info("Lost segment CDC links or CDC disable record were not found."); - } + if (lastSegBeforeSkip != null || cdcDisableSgmnt != null) + deleteCdcWalState(); + else if (log.isInfoEnabled()) + log.info("Lost segment CDC links or CDC disable record were not found."); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to delete lost segment CDC links. " + @@ -145,7 +143,7 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool } /** @return The index of the segment previous to the last gap or {@code null} if no gaps were found. */ - private Long findLastLostSegement() { + private Long findLastSegmentBeforeSkip() { AtomicReference lastLostSgmnt = new AtomicReference<>(); AtomicLong lastSgmnt = new AtomicLong(-1); @@ -171,7 +169,7 @@ private Long findLastLostSegement() { } /** @return The index of the segment that contains the last {@link CdcDisableRecord}. */ - private Long findLastCdcDisableSegment() { + private Long findLastSegmentWithCdcDisabledRecord() { AtomicReference lastRec = new AtomicReference<>(); consumeCdcSegments(segment -> { @@ -208,7 +206,7 @@ private Long findLastCdcDisableSegment() { } /** */ - private void resetWalState() { + private void deleteCdcWalState() { Path stateDir = walCdcDir.toPath().resolve(STATE_DIR); File state = stateDir.resolve(WAL_STATE_FILE_NAME).toFile(); @@ -217,11 +215,11 @@ private void resetWalState() { throw new IgniteException("Failed to delete wal state file [file=" + state.getAbsolutePath() + ']'); if (log.isInfoEnabled()) - log.info("The CDC application WAL state has been reset."); + log.info("The CDC application WAL state has been deleted."); } /** Delete all segments with an absolute index less than or equal to the given one. */ - private void deleteAll(long lastIdx) { + private void deleteAllUntil(long lastIdx) { consumeCdcSegments(segment -> { if (FileWriteAheadLogManager.segmentIndex(segment) > lastIdx) return; From efdf301e3a8217c17aba6c2bb85e02b8a11e2852 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 27 Jul 2023 22:48:54 +0300 Subject: [PATCH 19/22] Node join state refactored --- .../cdc/CdcDeleteLostSegmentsTask.java | 2 +- .../wal/FileWriteAheadLogManager.java | 77 ++++++------ .../org/apache/ignite/cdc/CdcSelfTest.java | 113 +++++++++++++++++- 3 files changed, 149 insertions(+), 43 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java index 53297e7f9fc4a..609c2f3a8e5ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java @@ -177,7 +177,7 @@ private Long findLastSegmentWithCdcDisabledRecord() { return; if (log.isInfoEnabled()) - log.info("Processing CDC segment [segment=" + segment + ']'); + log.info("Start process CDC segment [segment=" + segment + ']'); IgniteWalIteratorFactory.IteratorParametersBuilder builder = new IgniteWalIteratorFactory.IteratorParametersBuilder() diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 669c334673dcb..ea6157f8ac51b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -55,7 +55,6 @@ import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -70,6 +69,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.DistributedConfigurationUtils; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -111,6 +111,8 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -124,7 +126,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiPredicate; @@ -135,7 +136,6 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import static java.lang.String.format; import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static java.nio.file.StandardOpenOption.CREATE; @@ -514,28 +514,42 @@ public void setFileIOFactory(FileIOFactory ioFactory) { ensureHardLinkAvailable(walArchiveDir.toPath(), walCdcDir.toPath()); cctx.kernalContext().internalSubscriptionProcessor() - .registerDistributedConfigurationListener(dispatcher -> { - cdcDisabled.addListener((name, oldVal, newVal) -> { - if (log.isInfoEnabled()) { - log.info(format("Distributed property '%s' was changed from '%s' to '%s'.", - name, oldVal, newVal)); - } + .registerDistributedConfigurationListener(new DistributedConfigurationLifecycleListener() { + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + cdcDisabled.addListener((name, oldVal, newVal) -> { + if (newVal == null || oldVal == newVal) + return; + + if (oldVal == null) { + if (newVal) { + if (log.isInfoEnabled()) + log.info("CDC disabled. Distributed property '" + name + "' inited to 'true'."); + + lastCdcDisableSgmnt = -1; + } - if (newVal != null && newVal) { - log.warning("CDC was disabled."); + return; + } - logCdcDisableRecord(); - } + if (newVal) { + log.warning("CDC was disabled."); - if (oldVal != null && oldVal && !newVal) { - if (log.isInfoEnabled()) - log.info("CDC was enabled."); + logCdcDisableRecord(); + } + else { + if (log.isInfoEnabled()) + log.info("CDC was enabled."); - lastCdcDisableSgmnt = Long.MAX_VALUE; - } - }); + lastCdcDisableSgmnt = Long.MAX_VALUE; + } + }); + + dispatcher.registerProperty(cdcDisabled); + } - dispatcher.registerProperty(cdcDisabled); + @Override public void onReadyToWrite() { + DistributedConfigurationUtils.setDefaultValue(cdcDisabled, false, log); + } }); } @@ -940,7 +954,8 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { !(rec instanceof PageDeltaRecord || rec instanceof PageSnapshot || rec instanceof MemoryRecoveryRecord)) return null; - if (skipIfCdcDisabled(rec)) + // Skip if in-memory CDC disabled. + if (inMemoryCdc && cdcDisabled.getOrDefault(false) && !(rec instanceof CdcDisableRecord)) return null; FileWriteHandle currWrHandle = currentHandle(); @@ -1837,17 +1852,6 @@ public long maxWalSegmentSize() { return maxWalSegmentSize; } - /** @return {@code True} if log record should be skipped when CDC is disabled. */ - private boolean skipIfCdcDisabled(WALRecord rec) { - if (!inMemoryCdc || rec instanceof CdcDisableRecord || !cdcDisabled.getOrDefault(false)) - return false; - - LT.warn(log, "Logging CDC data records to WAL skipped. The '" + CDC_DISABLED + - "' distributed property is 'true'."); - - return true; - } - /** */ private void logCdcDisableRecord() { cctx.database().checkpointReadLock(); @@ -1855,13 +1859,12 @@ private void logCdcDisableRecord() { try { WALPointer ptr = log(new CdcDisableRecord(), RolloverType.CURRENT_SEGMENT); - if (ptr == null) - throw new IgniteException("CDC disable record was not logged."); - - lastCdcDisableSgmnt = ptr.index(); + lastCdcDisableSgmnt = ptr != null ? ptr.index() : -1; } catch (Exception e) { - U.error(log, "Unable to log CDC disable record: " + e.getMessage(), e); + lastCdcDisableSgmnt = -1; + + log.warning("Unable to log CDC disable record: " + e.getMessage(), e); } finally { cctx.database().checkpointReadUnlock(); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index e8a642de8e483..1d2776624b921 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -826,19 +826,122 @@ public void testDisable() throws Exception { UserCdcConsumer cnsmr = new UserCdcConsumer(); + // Cdc application must fail due to skipped data. + checkCdcAppFailed(ign, cnsmr, 1); + + ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); + + checkCdcApp(ign, cnsmr, 3); + } + + /** */ + @Test + public void testDisableNodeJoin() throws Exception { + assumeTrue("CDC with 2 local nodes can't determine correct PDS directory without specificConsistentId.", + specificConsistentId); + + IgniteEx ign0 = startGrid(0); + + ign0.cluster().state(ACTIVE); + + DistributedChangeableProperty cdcDisabled = ign0.context().distributedConfiguration() + .property(FileWriteAheadLogManager.CDC_DISABLED); + + IgniteCache cache0 = ign0.getOrCreateCache(DEFAULT_CACHE_NAME); + + primaryKeys(cache0, 1).forEach(k -> cache0.put(k, createUser(k))); + + cdcDisabled.propagate(true); + + IgniteEx ign1 = startGrid(1); + + if (persistenceEnabled) + ign1.cluster().setBaselineTopology(ign1.cluster().forServers().nodes()); + + IgniteCache cache1 = ign1.getOrCreateCache(DEFAULT_CACHE_NAME); + + primaryKeys(cache0, 2).forEach(k -> cache0.put(k, createUser(k))); + primaryKeys(cache1, 2).forEach(k -> cache1.put(k, createUser(k))); + + U.sleep(2 * WAL_ARCHIVE_TIMEOUT); + + cdcDisabled.propagate(false); + + primaryKeys(cache0, 3).forEach(k -> cache0.put(k, createUser(k))); + primaryKeys(cache1, 3).forEach(k -> cache1.put(k, createUser(k))); + + UserCdcConsumer cnsmr0 = new UserCdcConsumer(); + + checkCdcAppFailed(ign0, cnsmr0, 1); + + ign0.compute().execute(CdcDeleteLostSegmentsTask.class, + new VisorTaskArgument<>(F.nodeIds(ign0.cluster().nodes()), false)); + + checkCdcApp(ign0, cnsmr0, 3); + checkCdcApp(ign1, new UserCdcConsumer(), 3); + } + + /** */ + @Test + public void testDisableNodeRestart() throws Exception { + IgniteEx ign = startGrid(0); + + ign.cluster().state(ACTIVE); + + DistributedChangeableProperty cdcDisabled = ign.context().distributedConfiguration() + .property(FileWriteAheadLogManager.CDC_DISABLED); + + IgniteCache cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + + addData(cache, 0, 1); + + cdcDisabled.propagate(true); + + addData(cache, 0, 2); + + stopGrid(0); + + ign = startGrid(0); + cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + cdcDisabled = ign.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED); + + addData(cache, 0, 3); + + U.sleep(2 * WAL_ARCHIVE_TIMEOUT); + + if (persistenceEnabled) + cdcDisabled.propagate(false); + + addData(cache, 0, 4); + + UserCdcConsumer cnsmr = new UserCdcConsumer(); + + checkCdcAppFailed(ign, cnsmr, 1); + + ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); + + checkCdcApp(ign, cnsmr, 4); + } + + /** */ + void checkCdcAppFailed(Ignite ign, UserCdcConsumer cnsmr, int expSizeBefore) throws Exception { + cnsmr.clear(); + IgniteInternalFuture cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); - waitForSize(1, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + waitForSize(expSizeBefore, DEFAULT_CACHE_NAME, UPDATE, cnsmr); // Cdc application must fail due to skipped data. assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, "CDC disabled on node."); + } - ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); + /** */ + void checkCdcApp(Ignite ign, UserCdcConsumer cnsmr, int expSize) throws Exception { + cnsmr.clear(); - cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); - cnsmr.data.clear(); + IgniteInternalFuture cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); - waitForSize(3, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + waitForSize(expSize, DEFAULT_CACHE_NAME, UPDATE, cnsmr); cdcFut.cancel(); } From 2092ec0e676ff22a52738a3c3b100b8ab69568c5 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Sat, 29 Jul 2023 13:13:04 +0300 Subject: [PATCH 20/22] review fixes --- .../management/cdc/CdcDeleteLostSegmentsTask.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java index 609c2f3a8e5ce..2a48266bbacef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java @@ -117,13 +117,11 @@ protected CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool Long lastSegBeforeSkip = findLastSegmentBeforeSkip(); - if (lastSegBeforeSkip != null) - deleteAllUntil(lastSegBeforeSkip); + deleteAllUntil(lastSegBeforeSkip); Long cdcDisableSgmnt = findLastSegmentWithCdcDisabledRecord(); - if (cdcDisableSgmnt != null) - deleteAllUntil(cdcDisableSgmnt); + deleteAllUntil(cdcDisableSgmnt); if (lastSegBeforeSkip != null || cdcDisableSgmnt != null) deleteCdcWalState(); @@ -219,7 +217,10 @@ private void deleteCdcWalState() { } /** Delete all segments with an absolute index less than or equal to the given one. */ - private void deleteAllUntil(long lastIdx) { + private void deleteAllUntil(Long lastIdx) { + if (lastIdx == null) + return; + consumeCdcSegments(segment -> { if (FileWriteAheadLogManager.segmentIndex(segment) > lastIdx) return; From e265e8df2b1186b9f0f78a43a414df55c208fcc1 Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Thu, 24 Aug 2023 21:04:45 +0300 Subject: [PATCH 21/22] Fix tests --- .../org/apache/ignite/cdc/CdcSelfTest.java | 43 ++++++------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 1d2776624b921..5125dfa9f14b0 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -824,14 +824,7 @@ public void testDisable() throws Exception { addData(cache, 0, 3); - UserCdcConsumer cnsmr = new UserCdcConsumer(); - - // Cdc application must fail due to skipped data. - checkCdcAppFailed(ign, cnsmr, 1); - - ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); - - checkCdcApp(ign, cnsmr, 3); + checkCdcAppFailed(ign, 1, 3); } /** */ @@ -870,14 +863,7 @@ public void testDisableNodeJoin() throws Exception { primaryKeys(cache0, 3).forEach(k -> cache0.put(k, createUser(k))); primaryKeys(cache1, 3).forEach(k -> cache1.put(k, createUser(k))); - UserCdcConsumer cnsmr0 = new UserCdcConsumer(); - - checkCdcAppFailed(ign0, cnsmr0, 1); - - ign0.compute().execute(CdcDeleteLostSegmentsTask.class, - new VisorTaskArgument<>(F.nodeIds(ign0.cluster().nodes()), false)); - - checkCdcApp(ign0, cnsmr0, 3); + checkCdcAppFailed(ign0, 1, 3); checkCdcApp(ign1, new UserCdcConsumer(), 3); } @@ -914,31 +900,30 @@ public void testDisableNodeRestart() throws Exception { addData(cache, 0, 4); - UserCdcConsumer cnsmr = new UserCdcConsumer(); - - checkCdcAppFailed(ign, cnsmr, 1); - - ign.compute().execute(CdcDeleteLostSegmentsTask.class, new VisorTaskArgument<>(ign.localNode().id(), false)); - - checkCdcApp(ign, cnsmr, 4); + checkCdcAppFailed(ign, 1, persistenceEnabled ? 4 : 7); } /** */ - void checkCdcAppFailed(Ignite ign, UserCdcConsumer cnsmr, int expSizeBefore) throws Exception { - cnsmr.clear(); + private void checkCdcAppFailed(Ignite ign, int expSizeBeforeFail, int expSizeAfterFail) throws Exception { + UserCdcConsumer cnsmr = new UserCdcConsumer(); IgniteInternalFuture cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); - waitForSize(expSizeBefore, DEFAULT_CACHE_NAME, UPDATE, cnsmr); + waitForSize(expSizeBeforeFail, DEFAULT_CACHE_NAME, UPDATE, cnsmr); // Cdc application must fail due to skipped data. assertThrowsAnyCause(log, cdcFut::get, IgniteException.class, "CDC disabled on node."); - } - /** */ - void checkCdcApp(Ignite ign, UserCdcConsumer cnsmr, int expSize) throws Exception { + ign.compute().execute(CdcDeleteLostSegmentsTask.class, + new VisorTaskArgument<>(F.nodeIds(ign.cluster().nodes()), false)); + cnsmr.clear(); + checkCdcApp(ign, cnsmr, expSizeAfterFail); + } + + /** */ + private void checkCdcApp(Ignite ign, UserCdcConsumer cnsmr, int expSize) throws Exception { IgniteInternalFuture cdcFut = runAsync(createCdc(cnsmr, getConfiguration(ign.name()))); waitForSize(expSize, DEFAULT_CACHE_NAME, UPDATE, cnsmr); From 919721e967faa333b5d9f52942640abdf784497f Mon Sep 17 00:00:00 2001 From: NSAmelchev Date: Tue, 5 Sep 2023 13:42:47 +0300 Subject: [PATCH 22/22] Review fixes --- .../cache/persistence/wal/FileWriteAheadLogManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index ea3ab1a1bdfda..4d98a1469fdfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -955,7 +955,7 @@ private boolean checkTimeout(AtomicLong lastEvt, long timeout) { return null; // Skip if in-memory CDC disabled. - if (inMemoryCdc && cdcDisabled.getOrDefault(false) && !(rec instanceof CdcDisableRecord)) + if (inMemoryCdc && cdcDisabled.get() && !(rec instanceof CdcDisableRecord)) return null; FileWriteHandle currWrHandle = currentHandle();