diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 33b23af82a91e..2f9269b18b169 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridLocalConfigManager; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; @@ -157,6 +158,9 @@ public class CdcMain implements Runnable { /** Metadata update time. */ public static final String META_UPDATE = "MetadataUpdateTime"; + /** Event capture time. */ + public static final String EVT_CAPTURE_TIME = "EventCaptureTime"; + /** Binary metadata metric name. */ public static final String BINARY_META_DIR = "BinaryMetaDir"; @@ -201,6 +205,12 @@ public class CdcMain implements Runnable { /** Metadata update time. */ private HistogramMetricImpl metaUpdate; + /** + * Metric represents time between creating {@link DataRecord}, containing the data change events, and capturing them + * by {@link CdcConsumer}. + */ + private HistogramMetricImpl evtCaptureTime; + /** Change Data Capture configuration. */ protected final CdcConfiguration cdcCfg; @@ -423,6 +433,10 @@ private void initMetrics() { lastSegmentConsumptionTs = mreg.longMetric(LAST_SEG_CONSUMPTION_TIME, "Last time of consumption of WAL segment"); metaUpdate = mreg.histogram(META_UPDATE, new long[] {100, 500, 1000}, "Metadata update time"); + evtCaptureTime = mreg.histogram( + EVT_CAPTURE_TIME, + new long[] {5_000, 10_000, 15_000, 30_000, 60_000}, + "Time between creating an event on Ignite node and capturing it by CdcConsumer"); mreg.register(CDC_MODE, () -> cdcModeState.name(), String.class, "CDC mode"); } @@ -572,7 +586,10 @@ private boolean consumeSegment(Path segment) { * Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode. */ private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) { - try (DataEntryIterator iter = new DataEntryIterator(new IgniteWalIteratorFactory(log).iterator(builder.addFilter(ACTIVE_RECS)))) { + try (DataEntryIterator iter = new DataEntryIterator( + new IgniteWalIteratorFactory(log).iterator(builder.addFilter(ACTIVE_RECS)), + evtCaptureTime) + ) { if (walState != null) iter.init(walState.get2()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 2167e27934ae8..1cbf69bb3f441 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -31,12 +31,14 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord; import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; +import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; @@ -232,6 +234,9 @@ public static class DataEntryIterator implements Iterator, AutoClosea /** WAL iterator. */ private final WALIterator walIter; + /** Events capture time metric. */ + private final HistogramMetricImpl evtCaptureTime; + /** Current preloaded WAL record. */ private IgniteBiTuple curRec; @@ -241,9 +246,13 @@ public static class DataEntryIterator implements Iterator, AutoClosea /** Index of {@link #next} inside WAL record. */ private int entryIdx; - /** @param walIter WAL iterator. */ - public DataEntryIterator(WALIterator walIter) { + /** + * @param walIter WAL iterator. + * @param evtCaptureTime Event capture time metric. + */ + public DataEntryIterator(WALIterator walIter, HistogramMetricImpl evtCaptureTime) { this.walIter = walIter; + this.evtCaptureTime = evtCaptureTime; advance(); } @@ -281,6 +290,8 @@ void init(int idx) { next = null; + evtCaptureTime.value(System.currentTimeMillis() - ((TimeStampRecord)curRec.get2()).timestamp()); + advance(); return e; diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index 3c8c32c7a07e4..dcab0bcc441b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.metric.HistogramMetric; import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.spi.metric.MetricExporterSpi; import org.apache.ignite.spi.metric.ObjectMetric; @@ -60,6 +61,7 @@ import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX; import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_OFFSET; import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX; +import static org.apache.ignite.internal.cdc.CdcMain.EVT_CAPTURE_TIME; import static org.apache.ignite.internal.cdc.CdcMain.LAST_SEG_CONSUMPTION_TIME; import static org.apache.ignite.internal.cdc.CdcMain.MARSHALLER_DIR; import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName; @@ -243,6 +245,9 @@ protected void checkMetrics(CdcMain cdc, int expCnt) throws Exception { m -> mreg.findMetric(m).value(), m -> mreg.>findMetric(m).value() ); + + HistogramMetric evtCaptureTime = mreg.findMetric(EVT_CAPTURE_TIME); + assertEquals(expCnt, (int)Arrays.stream(evtCaptureTime.value()).sum()); } /** */