From 0764fa6bac62aa60df7a4fe0193f8c4029ba8963 Mon Sep 17 00:00:00 2001 From: alextheimer Date: Wed, 5 Feb 2025 21:53:56 -0800 Subject: [PATCH] fix(memstore): account for clock drift during latency measurement (#1943) Adds one additional metric negativeIngestionPipelineLatency to record negative ingestion pipeline latencies (as their absolute values). Additionally, values recorded into the two usual ingestion latency metrics are now wrapped inside Math.max(0, value) calls to prevent exceptions. --- .../main/scala/filodb.core/RateLimiter.scala | 30 ++++++ .../memstore/TimeSeriesShard.scala | 37 +++++++- .../scala/filodb.core/RateLimiterSpec.scala | 91 +++++++++++++++++++ 3 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/filodb.core/RateLimiter.scala create mode 100644 core/src/test/scala/filodb.core/RateLimiterSpec.scala diff --git a/core/src/main/scala/filodb.core/RateLimiter.scala b/core/src/main/scala/filodb.core/RateLimiter.scala new file mode 100644 index 0000000000..9cb7399035 --- /dev/null +++ b/core/src/main/scala/filodb.core/RateLimiter.scala @@ -0,0 +1,30 @@ +package filodb.core + +import scala.concurrent.duration.Duration + +/** + * Rate-limiter utility. + * @param period a "successful" attempt will be indicated only after a full + * period has elapsed since the previous success. + */ +class RateLimiter(period: Duration) { + private var lastSuccessMillis = 0L; + + /** + * Returns true to indicate an attempt was "successful", else it was "failed". + * Successes are returned only after a full period has elapsed since the previous success. + * + * NOTE: this operation is not thread-safe, but if at least one concurrent invocation is + * successful, then one of the successful timestamps will be recorded internally as the + * most-recent success. In practice, this means async calls may succeed in bursts (which + * may be acceptable in some use-cases). + */ + def attempt(): Boolean = { + val nowMillis = System.currentTimeMillis() + if (nowMillis - lastSuccessMillis > period.toMillis) { + lastSuccessMillis = nowMillis + return true + } + false + } +} diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 0ed968b741..497e717d94 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -124,6 +124,12 @@ class TimeSeriesShardStats(dataset: DatasetRef, shardNum: Int) { val ingestionPipelineLatency = Kamon.histogram("ingestion-pipeline-latency", MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags)) + /** + * Records the absolute value of otherwise-negative ingestion pipeline latencies. + */ + val negativeIngestionPipelineLatency = Kamon.histogram("negative-ingestion-pipeline-latency", + MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags)) + val chunkFlushTaskLatency = Kamon.histogram("chunk-flush-task-latency-after-retries", MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags)) @@ -312,6 +318,11 @@ class TimeSeriesShard(val ref: DatasetRef, */ private var nextPartitionID = 0 + // This should be handled by a logging-specific class if rate-limited logs become a common use-case. + // NOTE: This is not thread-safe, but it's good enough for this application-- + // see RateLimiter's javadoc for details. + private val negativeIngestionTimeLogRateLimiter = new RateLimiter(Duration(30, TimeUnit.SECONDS)) + /** * This index helps identify which partitions have any given column-value. * Used to answer queries not involving the full partition key. @@ -1301,7 +1312,24 @@ class TimeSeriesShard(val ref: DatasetRef, } val currentTime = System.currentTimeMillis() - shardStats.ingestionPipelineLatency.record(currentTime - container.timestamp) + + // Account for clock skew, otherwise exceptions are thrown when negative latencies are recorded. + val ingestionPipelineLatency = currentTime - container.timestamp + shardStats.ingestionPipelineLatency.record(Math.max(0, ingestionPipelineLatency)) + // It would be helpful to know if latencies are frequently negative; + // record their absolute values into a separate histogram. + // We'll continue to count floored-to-zero values in the above metric; this will + // keep e.g. its rate() as-expected when latencies are negligibly negative. + if (ingestionPipelineLatency < 0) { + shardStats.negativeIngestionPipelineLatency.record(-ingestionPipelineLatency) + if (negativeIngestionTimeLogRateLimiter.attempt()) { + logger.error(s"Negative ingestion pipeline latency detected: " + + s"currentTimeMillis=${currentTime}; " + + s"container.timestamp=${container.timestamp}; " + + s"ingestionPipelineLatency=${ingestionPipelineLatency}") + } + } + // Only update stuff if no exception was thrown. if (ingestionTime != lastIngestionTime) { @@ -1312,7 +1340,12 @@ class TimeSeriesShard(val ref: DatasetRef, logger.warn(s"createFlushTasks reporting ingestion delay for shardNum=$shardNum" + s" containerTimestamp=${container.timestamp} numRecords=${container.numRecords} offset = ${this._offset}") } - shardStats.ingestionClockDelay.update(currentTime - ingestionTime) + + // Account for clock skew, otherwise exceptions are thrown when negative latencies are recorded. + // ingestionTime is _probably_ less likely to occur after currentTime than container.timestamp + // (given that it's the result of the Math.max() above). If really needed, it can get the same + // two-metric treatment as ingestionPipelineLatency. + shardStats.ingestionClockDelay.update(Math.max(0, currentTime - ingestionTime)) } tasks diff --git a/core/src/test/scala/filodb.core/RateLimiterSpec.scala b/core/src/test/scala/filodb.core/RateLimiterSpec.scala new file mode 100644 index 0000000000..f05799473a --- /dev/null +++ b/core/src/test/scala/filodb.core/RateLimiterSpec.scala @@ -0,0 +1,91 @@ +package filodb.core + +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{Executors, TimeUnit} +import scala.concurrent.duration.Duration + +class RateLimiterSpec extends AnyFunSpec with Matchers { + it("should apply rate-limits accordingly") { + val rateLimiter = new RateLimiter(Duration(2, TimeUnit.SECONDS)) + + // First attempt should succeed. + rateLimiter.attempt() shouldEqual true + + // Others before a full period has elapsed should fail. + rateLimiter.attempt() shouldEqual false + rateLimiter.attempt() shouldEqual false + rateLimiter.attempt() shouldEqual false + + // Wait the period... + Thread.sleep(2000) + + // Next attempt should succeed. + rateLimiter.attempt() shouldEqual true + + // Again, attempts should fail until a period has elapsed. + rateLimiter.attempt() shouldEqual false + rateLimiter.attempt() shouldEqual false + rateLimiter.attempt() shouldEqual false + } + + it("should reasonably rate-limit concurrent threads") { + val nThreads = 100 + val period = Duration(1, TimeUnit.SECONDS) + val nPeriods = 5 + + val rateLimiter = new RateLimiter(period) + val pool = Executors.newFixedThreadPool(nThreads) + + // All threads will try to increment the time-appropriate counter. + // At the end of the test, there should be at least one counted per period, + // and no single counter should exceed the count of threads (i.e. at least one thread + // was paused long enough that it updated the RateLimiter's internal timestamp + // to something in the previous period. + val periodCounters = (0 until nPeriods).map(_ => new AtomicInteger(0)) + + // Prep the runnable (some of these variables are updated later). + var startMillis = -1L + var isStarted = false + var isShutdown = false + val runnable: Runnable = () => { + while (!isStarted) { + Thread.sleep(500) + } + while (!isShutdown) { + if (rateLimiter.attempt()) { + val iPeriod = (System.currentTimeMillis() - startMillis) / period.toMillis + periodCounters(iPeriod.toInt).incrementAndGet() + } + } + } + + // Kick off the threads and start the test. + for (i <- 0 until nThreads) { + pool.submit(runnable) + } + startMillis = System.currentTimeMillis() + isStarted = true + + // Wait for all periods to elapse. + Thread.sleep(nPeriods * period.toMillis) + + // Shutdown and wait for everything to finish. + isShutdown = true + pool.shutdown() + while (!pool.isTerminated) { + Thread.sleep(1000) + } + + periodCounters.forall(_.get() > 0) shouldEqual true + periodCounters.map(_.get()).max <= nThreads + + // Typical local "println(periodCounters)" output: + // Vector(1, 34, 1, 1, 1) + // Vector(1, 20, 1, 1, 1) + // Vector(1, 13, 1, 2, 1) + } +} \ No newline at end of file