Skip to content

Commit

Permalink
fix(memstore): account for clock drift during latency measurement
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Jan 31, 2025
1 parent 166ea1b commit eb3dd03
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ class TimeSeriesShardStats(dataset: DatasetRef, shardNum: Int) {
val ingestionPipelineLatency = Kamon.histogram("ingestion-pipeline-latency",
MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags))

/**
* Records the absolute value of otherwise-negative ingestion pipeline latencies.
* Negative latencies are exclusively recorded here; they are not recorded
* by the above ingestionPipelineLatency metric.
*/
val negativeIngestionPipelineLatency = Kamon.histogram("negative-ingestion-pipeline-latency",
MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags))

val chunkFlushTaskLatency = Kamon.histogram("chunk-flush-task-latency-after-retries",
MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags))

Expand Down Expand Up @@ -1301,7 +1309,18 @@ class TimeSeriesShard(val ref: DatasetRef,
}

val currentTime = System.currentTimeMillis()
shardStats.ingestionPipelineLatency.record(currentTime - container.timestamp)

// Account for clock skew, otherwise exceptions are thrown when negative latencies are recorded.
val ingestionPipelineLatency = currentTime - container.timestamp
shardStats.ingestionPipelineLatency.record(Math.max(0, ingestionPipelineLatency))
// It would be helpful to know if latencies are frequently negative;
// record their absolute values into a separate histogram.
// We'll continue to count floored-to-zero values in the above metric; this will
// keep e.g. its rate() as-expected when latencies are negligibly negative.
if (ingestionPipelineLatency < 0) {
shardStats.negativeIngestionPipelineLatency.record(-ingestionPipelineLatency)
}

// Only update stuff if no exception was thrown.

if (ingestionTime != lastIngestionTime) {
Expand All @@ -1312,7 +1331,12 @@ class TimeSeriesShard(val ref: DatasetRef,
logger.warn(s"createFlushTasks reporting ingestion delay for shardNum=$shardNum" +
s" containerTimestamp=${container.timestamp} numRecords=${container.numRecords} offset = ${this._offset}")
}
shardStats.ingestionClockDelay.update(currentTime - ingestionTime)

// Account for clock skew, otherwise exceptions are thrown when negative latencies are recorded.
// ingestionTime is _probably_ less likely to occur after currentTime than container.timestamp
// (given that it's the result of the Math.max() above). If really needed, it can get the same
// two-metric treatment as ingestionPipelineLatency.
shardStats.ingestionClockDelay.update(Math.max(0, currentTime - ingestionTime))
}

tasks
Expand Down

0 comments on commit eb3dd03

Please sign in to comment.