Skip to content

Commit

Permalink
IGNITE-21123 Add CdcEvent capture metric (#11111)
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim authored Dec 21, 2023
1 parent 431f598 commit a846244
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
Expand Down Expand Up @@ -157,6 +158,9 @@ public class CdcMain implements Runnable {
/** Metadata update time. */
public static final String META_UPDATE = "MetadataUpdateTime";

/** Event capture time. */
public static final String EVT_CAPTURE_TIME = "EventCaptureTime";

/** Binary metadata metric name. */
public static final String BINARY_META_DIR = "BinaryMetaDir";

Expand Down Expand Up @@ -201,6 +205,12 @@ public class CdcMain implements Runnable {
/** Metadata update time. */
private HistogramMetricImpl metaUpdate;

/**
* Metric represents time between creating {@link DataRecord}, containing the data change events, and capturing them
* by {@link CdcConsumer}.
*/
private HistogramMetricImpl evtCaptureTime;

/** Change Data Capture configuration. */
protected final CdcConfiguration cdcCfg;

Expand Down Expand Up @@ -423,6 +433,10 @@ private void initMetrics() {
lastSegmentConsumptionTs =
mreg.longMetric(LAST_SEG_CONSUMPTION_TIME, "Last time of consumption of WAL segment");
metaUpdate = mreg.histogram(META_UPDATE, new long[] {100, 500, 1000}, "Metadata update time");
evtCaptureTime = mreg.histogram(
EVT_CAPTURE_TIME,
new long[] {5_000, 10_000, 15_000, 30_000, 60_000},
"Time between creating an event on Ignite node and capturing it by CdcConsumer");
mreg.register(CDC_MODE, () -> cdcModeState.name(), String.class, "CDC mode");
}

Expand Down Expand Up @@ -572,7 +586,10 @@ private boolean consumeSegment(Path segment) {
* Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode.
*/
private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) {
try (DataEntryIterator iter = new DataEntryIterator(new IgniteWalIteratorFactory(log).iterator(builder.addFilter(ACTIVE_RECS)))) {
try (DataEntryIterator iter = new DataEntryIterator(
new IgniteWalIteratorFactory(log).iterator(builder.addFilter(ACTIVE_RECS)),
evtCaptureTime)
) {
if (walState != null)
iter.init(walState.get2());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord;
import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand Down Expand Up @@ -232,6 +234,9 @@ public static class DataEntryIterator implements Iterator<DataEntry>, AutoClosea
/** WAL iterator. */
private final WALIterator walIter;

/** Events capture time metric. */
private final HistogramMetricImpl evtCaptureTime;

/** Current preloaded WAL record. */
private IgniteBiTuple<WALPointer, WALRecord> curRec;

Expand All @@ -241,9 +246,13 @@ public static class DataEntryIterator implements Iterator<DataEntry>, AutoClosea
/** Index of {@link #next} inside WAL record. */
private int entryIdx;

/** @param walIter WAL iterator. */
public DataEntryIterator(WALIterator walIter) {
/**
* @param walIter WAL iterator.
* @param evtCaptureTime Event capture time metric.
*/
public DataEntryIterator(WALIterator walIter, HistogramMetricImpl evtCaptureTime) {
this.walIter = walIter;
this.evtCaptureTime = evtCaptureTime;

advance();
}
Expand Down Expand Up @@ -281,6 +290,8 @@ void init(int idx) {

next = null;

evtCaptureTime.value(System.currentTimeMillis() - ((TimeStampRecord)curRec.get2()).timestamp());

advance();

return e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.metric.HistogramMetric;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.ObjectMetric;
Expand All @@ -60,6 +61,7 @@
import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_OFFSET;
import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
import static org.apache.ignite.internal.cdc.CdcMain.EVT_CAPTURE_TIME;
import static org.apache.ignite.internal.cdc.CdcMain.LAST_SEG_CONSUMPTION_TIME;
import static org.apache.ignite.internal.cdc.CdcMain.MARSHALLER_DIR;
import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName;
Expand Down Expand Up @@ -243,6 +245,9 @@ protected void checkMetrics(CdcMain cdc, int expCnt) throws Exception {
m -> mreg.<LongMetric>findMetric(m).value(),
m -> mreg.<ObjectMetric<String>>findMetric(m).value()
);

HistogramMetric evtCaptureTime = mreg.findMetric(EVT_CAPTURE_TIME);
assertEquals(expCnt, (int)Arrays.stream(evtCaptureTime.value()).sum());
}

/** */
Expand Down

0 comments on commit a846244

Please sign in to comment.