Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-19661 Fixed CDC data records logging to WAL for in-memory caches when disabled. #10766

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
45fb14a
Fixed CDC data records logging to WAL for in-memory caches when disab…
NSAmelchev Jun 6, 2023
68cbebe
Fixed CDC data records logging to WAL for in-memory caches when disab…
NSAmelchev Jun 15, 2023
97b2f45
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Jun 15, 2023
244398e
Tests fix
NSAmelchev Jun 15, 2023
d1fef0c
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Jun 27, 2023
2deaf80
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Jun 27, 2023
89bd151
wip
NSAmelchev Jun 28, 2023
a02782f
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Jun 28, 2023
e6672ed
wip
NSAmelchev Jun 29, 2023
98ce845
wip
NSAmelchev Jun 29, 2023
3a82585
wip
NSAmelchev Jun 29, 2023
fab7560
wip
NSAmelchev Jun 29, 2023
ee734f0
Fix tests
NSAmelchev Jun 29, 2023
74611e9
Fix test
NSAmelchev Jun 29, 2023
de3374f
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Jul 4, 2023
f862aa2
fixed compilation
NSAmelchev Jul 4, 2023
2cc9e37
review fixes
NSAmelchev Jul 4, 2023
f56edd7
review fixes
NSAmelchev Jul 4, 2023
e78e364
review fixes
NSAmelchev Jul 4, 2023
b5f7b14
WIP
NSAmelchev Jul 5, 2023
fa43eae
Log CDC disable and rollover
NSAmelchev Jul 6, 2023
9c2504c
minor fixes
NSAmelchev Jul 7, 2023
fd41a0b
review fixes
NSAmelchev Jul 7, 2023
efdf301
Node join state refactored
NSAmelchev Jul 27, 2023
180a9f3
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Jul 27, 2023
2092ec0
review fixes
NSAmelchev Jul 29, 2023
e265e8d
Fix tests
NSAmelchev Aug 24, 2023
3e9db4c
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Aug 24, 2023
0133586
Merge remote-tracking branch 'apache/master' into ignite-19661
NSAmelchev Aug 31, 2023
919721e
Review fixes
NSAmelchev Sep 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions docs/_docs/persistence/change-data-capture.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,30 @@ IMPORTANT: `ignite-cdc.sh` implements the fail-fast approach. It just fails in c
5. Infinitely wait for the newly available segment and process it.
6. Stop the consumer in case of a failure or a received stop signal.

== Handling skipped segments
== Disabling CDC

The CDC can be disabled manually or by configured directory maximum size. In this case a hard link creation will be skipped.
The CDC can be disabled manually or by configured directory maximum size threshold.

WARNING: All changes in skipped segments will be lost!
WARNING: All changes while CDC is disabled will be lost!

So when enabled there will be gap between segments: `0000000000000002.wal`, `0000000000000010.wal`, `0000000000000011.wal`, for example.
In this case `ignite-cdc.sh` will fail with the something like "Found missed segments. Some events are missed. Exiting! [lastSegment=2, nextSegment=10]".
When CDC disabled manually, the special WAL record (CDC disable record) will be written to the end of
current segment with rollover afterwards. Hard link creation will be skipped for next segments until CDC is enabled.
For in-memory CDC no changes will be flushed to the WAL.

The CDC application will fail if meets this record while processing segments. Example of error:
`CDC disabled on node. Please, check node log. Exiting!`

When configured directory maximum size is exceeded, hard link creation will be skipped for this node.
Once space in the directory is available again, hard link creation will be continued.
So, there will be gap between segments: `0000000000000002.wal`, `0000000000000010.wal`, `0000000000000011.wal`, for example.
In this case `ignite-cdc.sh` will fail with something like "Found missed segments. Please, check node log. Exiting! [lastSegment=2, nextSegment=10]".

These errors mean that data changes are lost and administrator attention is required.

NOTE: Make sure you need to sync data before restarting the CDC application. You can synchronize caches using
link:#forcefully-resend-all-cache-data-to-cdc[resend command], snapshot or other methods.

To fix this error you can run the following link:tools/control-script[Control Script] command:
To fix these errors you can run the following link:tools/control-script[Control Script] command:

[source,shell]
----
Expand All @@ -165,7 +176,7 @@ control.sh|bat --cdc delete_lost_segment_links
control.sh|bat --cdc delete_lost_segment_links --node-id node_id
----

The command will remove all segment links before the last gap.
The command will remove all segment links before the CDC is disabled or the last gap.

For example, CDC was turned off several times: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`, `0000000000000010.wal`, `0000000000000011.wal`
Then, after the command is executed, the following segment links will be deleted: `000000000000002.wal`, `000000000000003.wal`, `000000000000008.wal`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,10 +30,12 @@
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cdc.AbstractCdcTest.UserCdcConsumer;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.IgniteEx;
Expand All @@ -56,8 +59,10 @@
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
import static org.apache.ignite.cdc.CdcSelfTest.addData;
Expand All @@ -67,14 +72,17 @@
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
import static org.apache.ignite.testframework.GridTestUtils.stopThreads;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.util.GridCommandHandlerClusterByClassTest.CACHES;
import static org.apache.ignite.util.SystemViewCommandTest.NODE_ID;
import static org.junit.Assume.assumeTrue;

/**
* CDC command tests.
*/
@RunWith(Parameterized.class)
public class CdcCommandTest extends GridCommandHandlerAbstractTest {
/** */
private static final String CDC_DISABLED_DATA_REGION = "cdc_disabled_data_region";
Expand All @@ -85,6 +93,9 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest {
/** */
public static final String RESEND = "resend";

/** */
public static final int WAL_ARCHIVE_TIMEOUT = 1_000;

/** */
private IgniteEx srv0;

Expand All @@ -100,6 +111,16 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest {
/** */
private volatile IgniteThrowableConsumer<WALRecord> onLogLsnr;

/** */
@Parameterized.Parameter(1)
public boolean persistenceEnabled;

/** */
@Parameterized.Parameters(name = "cmdHnd={0}, persistence={1}")
public static Collection<Object[]> parameters() {
return cartesianProduct(CMD_HNDS.keySet(), F.asList(true, false));
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
Expand All @@ -108,12 +129,13 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest {
.setBackups(1));

cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(1000)
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
.setDataRegionConfigurations(new DataRegionConfiguration()
.setName(CDC_DISABLED_DATA_REGION)
.setCdcEnabled(false))
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setCdcEnabled(true)));
.setCdcEnabled(true)
.setPersistenceEnabled(persistenceEnabled)));

cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED);

Expand Down Expand Up @@ -149,6 +171,9 @@ public class CdcCommandTest extends GridCommandHandlerAbstractTest {
srv0 = startGrid(0);
srv1 = startGrid(1);

if (persistenceEnabled)
srv0.cluster().state(ClusterState.ACTIVE);

awaitPartitionMapExchange();

cdcDisabled = srv0.context().distributedConfiguration().property(FileWriteAheadLogManager.CDC_DISABLED);
Expand Down Expand Up @@ -216,25 +241,64 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception {
/** */
@Test
public void testDeleteLostSegmentLinks() throws Exception {
checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), true);
assumeTrue(persistenceEnabled);

archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), true);
}

/** */
@Test
public void testDeleteLostSegmentLinksOneNode() throws Exception {
checkDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), false);
assumeTrue(persistenceEnabled);

archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 2L), F.asList(2L), false);
}

/** */
@Test
public void testDeleteLostSegmentLinksMultipleGaps() throws Exception {
checkDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true);
assumeTrue(persistenceEnabled);

archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 3L, 5L), F.asList(5L), true);
}

/** */
private void checkDeleteLostSegmentLinks(List<Long> expBefore, List<Long> expAfter, boolean allNodes) throws Exception {
@Test
public void testDeleteLostSegmentLinksNoGaps() throws Exception {
archiveAndCheckDeleteLostSegmentLinks(F.asList(0L, 1L), F.asList(0L, 1L), true);
}

/** */
@Test
public void testDeleteLostSegmentLinksCdcDisable() throws Exception {
addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);

cdcDisabled.propagate(true);

addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);

cdcDisabled.propagate(false);

addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);

U.sleep(2 * WAL_ARCHIVE_TIMEOUT);

checkDeleteLostSegmentLinks(F.asList(0L, 1L), F.asList(1L), true);
}

/** */
private void archiveAndCheckDeleteLostSegmentLinks(
List<Long> expBefore,
List<Long> expAfter,
boolean allNodes
) throws Exception {
archiveSegmentLinks(expBefore);

checkDeleteLostSegmentLinks(expBefore, expAfter, allNodes);
}

/** */
private void checkDeleteLostSegmentLinks(List<Long> expBefore, List<Long> expAfter, boolean allNodes) {
checkLinks(srv0, expBefore);
checkLinks(srv1, expBefore);

Expand All @@ -254,34 +318,35 @@ private void checkLinks(IgniteEx srv, List<Long> expLinks) {
File[] links = wal0.walCdcDirectory().listFiles(WAL_SEGMENT_FILE_FILTER);

assertEquals(expLinks.size(), links.length);
Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex)
.allMatch(expLinks::contains);
assertTrue(Arrays.stream(links).map(File::toPath).map(FileWriteAheadLogManager::segmentIndex)
.allMatch(expLinks::contains));
}

/** Archive given segments links with possible gaps. */
private void archiveSegmentLinks(List<Long> idxs) throws Exception {
for (long idx = 0; idx <= idxs.stream().mapToLong(v -> v).max().getAsLong(); idx++) {
cdcDisabled.propagate(!idxs.contains(idx));
CountDownLatch latch = new CountDownLatch(G.allGrids().size());

archiveSegment();
}
}
for (Ignite srv : G.allGrids()) {
long idx0 = idx;

/** */
private void archiveSegment() throws Exception {
CountDownLatch latch = new CountDownLatch(G.allGrids().size());
srv.events().localListen(evt -> {
if (idx0 == ((WalSegmentArchivedEvent)evt).getAbsWalSegmentIdx())
latch.countDown();

for (Ignite srv : G.allGrids()) {
srv.events().localListen(evt -> {
latch.countDown();
return true;
}, EVT_WAL_SEGMENT_ARCHIVED);
}

return false;
}, EVT_WAL_SEGMENT_ARCHIVED);
}
addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);

addData(srv1.cache(DEFAULT_CACHE_NAME), 0, KEYS_CNT);
boolean skipNext = !idxs.contains(idx + 1) && idx < idxs.get(idxs.size() - 1);

if (!skipNext || !idxs.contains(idx))
latch.await(2 * WAL_ARCHIVE_TIMEOUT, TimeUnit.MILLISECONDS);

latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
cdcDisabled.propagate(skipNext);
}
}

/** */
Expand Down Expand Up @@ -371,7 +436,7 @@ public void testResendCancelOnNodeLeft() {
/** */
@Test
public void testResendCancelOnRebalanceInProgress() throws Exception {
Assume.assumeTrue(commandHandler.equals(CLI_CMD_HND));
assumeTrue(commandHandler.equals(CLI_CMD_HND));

injectTestSystemOut();

Expand All @@ -391,7 +456,12 @@ public void testResendCancelOnRebalanceInProgress() throws Exception {
});
}

GridTestUtils.runAsync(() -> startGrid(3));
GridTestUtils.runAsync(() -> {
startGrid(3);

if (persistenceEnabled)
srv0.cluster().setBaselineTopology(srv0.cluster().forServers().nodes());
});

rebalanceStarted.await();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.ignite.testframework.GridTestUtils.stopThreads;
import static org.apache.ignite.util.CdcCommandTest.CDC;
import static org.apache.ignite.util.CdcCommandTest.RESEND;
import static org.apache.ignite.util.CdcCommandTest.WAL_ARCHIVE_TIMEOUT;
import static org.apache.ignite.util.CdcCommandTest.runCdc;
import static org.apache.ignite.util.CdcCommandTest.waitForSize;
import static org.apache.ignite.util.GridCommandHandlerClusterByClassTest.CACHES;
Expand All @@ -45,7 +46,7 @@ public class CdcResendCommandTest extends GridCommandHandlerAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(1000)
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setCdcEnabled(true)
.setPersistenceEnabled(true)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DISABLE;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
Expand Down Expand Up @@ -449,8 +450,9 @@ public void consumeWalSegmentsUntilStopped() {
long nextSgmnt = segmentIndex(p);

if (lastSgmnt.get() != -1 && nextSgmnt - lastSgmnt.get() != 1) {
throw new IgniteException("Found missed segments. Some events are missed. Exiting! " +
"[lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']');
throw new IgniteException("Found missed segments. Please, check node log. Exiting! " +
"To continue CDC, please, use the command: control.sh|bat --cdc delete_lost_segment_links" +
" [lastSegment=" + lastSgmnt.get() + ", nextSegment=" + nextSgmnt + ']');
}

lastSgmnt.set(nextSgmnt);
Expand Down Expand Up @@ -486,7 +488,7 @@ private void consumeSegment(Path segment) {
.marshallerMappingFileStoreDir(marshaller)
.keepBinary(cdcCfg.isKeepBinary())
.filesOrDirs(segment.toFile())
.addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD);
.addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD || type == CDC_DISABLE);

if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0)
builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ public static class DataEntryIterator implements Iterator<DataEntry>, AutoClosea
/** @param walIter WAL iterator. */
DataEntryIterator(WALIterator walIter) {
this.walIter = walIter;

advance();
}

/** @return Current state. */
Expand All @@ -265,6 +263,9 @@ void init(int idx) {

/** {@inheritDoc} */
@Override public boolean hasNext() {
if (next == null)
advance();

return next != null;
}

Expand All @@ -277,8 +278,6 @@ void init(int idx) {

next = null;

advance();

return e;
}

Expand All @@ -303,6 +302,12 @@ private void advance() {

curRec = walIter.next();

if (curRec.get2().type() == WALRecord.RecordType.CDC_DISABLE) {
throw new IgniteException("CDC disabled on node. Please, check node log. Exiting! " +
"To continue CDC, please, use the command: control.sh|bat --cdc delete_lost_segment_links " +
"[state=" + curRec.get1() + ']');
}

next = ((DataRecord)curRec.get2()).get(entryIdx);
}

Expand Down
Loading