Skip to content

Commit

Permalink
fix(memstore): account for clock drift during latency measurement (#1943
Browse files Browse the repository at this point in the history
)

Adds one additional metric negativeIngestionPipelineLatency to record negative
ingestion pipeline latencies (as their absolute values). Additionally, values
recorded into the two usual ingestion latency metrics are now wrapped inside
Math.max(0, value) calls to prevent exceptions.
  • Loading branch information
alextheimer authored Feb 6, 2025
1 parent 166ea1b commit 0764fa6
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 2 deletions.
30 changes: 30 additions & 0 deletions core/src/main/scala/filodb.core/RateLimiter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package filodb.core

import scala.concurrent.duration.Duration

/**
* Rate-limiter utility.
* @param period a "successful" attempt will be indicated only after a full
* period has elapsed since the previous success.
*/
class RateLimiter(period: Duration) {
private var lastSuccessMillis = 0L;

/**
* Returns true to indicate an attempt was "successful", else it was "failed".
* Successes are returned only after a full period has elapsed since the previous success.
*
* NOTE: this operation is not thread-safe, but if at least one concurrent invocation is
* successful, then one of the successful timestamps will be recorded internally as the
* most-recent success. In practice, this means async calls may succeed in bursts (which
* may be acceptable in some use-cases).
*/
def attempt(): Boolean = {
val nowMillis = System.currentTimeMillis()
if (nowMillis - lastSuccessMillis > period.toMillis) {
lastSuccessMillis = nowMillis
return true
}
false
}
}
37 changes: 35 additions & 2 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ class TimeSeriesShardStats(dataset: DatasetRef, shardNum: Int) {
val ingestionPipelineLatency = Kamon.histogram("ingestion-pipeline-latency",
MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags))

/**
* Records the absolute value of otherwise-negative ingestion pipeline latencies.
*/
val negativeIngestionPipelineLatency = Kamon.histogram("negative-ingestion-pipeline-latency",
MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags))

val chunkFlushTaskLatency = Kamon.histogram("chunk-flush-task-latency-after-retries",
MeasurementUnit.time.milliseconds).withTags(TagSet.from(tags))

Expand Down Expand Up @@ -312,6 +318,11 @@ class TimeSeriesShard(val ref: DatasetRef,
*/
private var nextPartitionID = 0

// This should be handled by a logging-specific class if rate-limited logs become a common use-case.
// NOTE: This is not thread-safe, but it's good enough for this application--
// see RateLimiter's javadoc for details.
private val negativeIngestionTimeLogRateLimiter = new RateLimiter(Duration(30, TimeUnit.SECONDS))

/**
* This index helps identify which partitions have any given column-value.
* Used to answer queries not involving the full partition key.
Expand Down Expand Up @@ -1301,7 +1312,24 @@ class TimeSeriesShard(val ref: DatasetRef,
}

val currentTime = System.currentTimeMillis()
shardStats.ingestionPipelineLatency.record(currentTime - container.timestamp)

// Account for clock skew, otherwise exceptions are thrown when negative latencies are recorded.
val ingestionPipelineLatency = currentTime - container.timestamp
shardStats.ingestionPipelineLatency.record(Math.max(0, ingestionPipelineLatency))
// It would be helpful to know if latencies are frequently negative;
// record their absolute values into a separate histogram.
// We'll continue to count floored-to-zero values in the above metric; this will
// keep e.g. its rate() as-expected when latencies are negligibly negative.
if (ingestionPipelineLatency < 0) {
shardStats.negativeIngestionPipelineLatency.record(-ingestionPipelineLatency)
if (negativeIngestionTimeLogRateLimiter.attempt()) {
logger.error(s"Negative ingestion pipeline latency detected: " +
s"currentTimeMillis=${currentTime}; " +
s"container.timestamp=${container.timestamp}; " +
s"ingestionPipelineLatency=${ingestionPipelineLatency}")
}
}

// Only update stuff if no exception was thrown.

if (ingestionTime != lastIngestionTime) {
Expand All @@ -1312,7 +1340,12 @@ class TimeSeriesShard(val ref: DatasetRef,
logger.warn(s"createFlushTasks reporting ingestion delay for shardNum=$shardNum" +
s" containerTimestamp=${container.timestamp} numRecords=${container.numRecords} offset = ${this._offset}")
}
shardStats.ingestionClockDelay.update(currentTime - ingestionTime)

// Account for clock skew, otherwise exceptions are thrown when negative latencies are recorded.
// ingestionTime is _probably_ less likely to occur after currentTime than container.timestamp
// (given that it's the result of the Math.max() above). If really needed, it can get the same
// two-metric treatment as ingestionPipelineLatency.
shardStats.ingestionClockDelay.update(Math.max(0, currentTime - ingestionTime))
}

tasks
Expand Down
91 changes: 91 additions & 0 deletions core/src/test/scala/filodb.core/RateLimiterSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package filodb.core

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers


import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.duration.Duration

class RateLimiterSpec extends AnyFunSpec with Matchers {
it("should apply rate-limits accordingly") {
val rateLimiter = new RateLimiter(Duration(2, TimeUnit.SECONDS))

// First attempt should succeed.
rateLimiter.attempt() shouldEqual true

// Others before a full period has elapsed should fail.
rateLimiter.attempt() shouldEqual false
rateLimiter.attempt() shouldEqual false
rateLimiter.attempt() shouldEqual false

// Wait the period...
Thread.sleep(2000)

// Next attempt should succeed.
rateLimiter.attempt() shouldEqual true

// Again, attempts should fail until a period has elapsed.
rateLimiter.attempt() shouldEqual false
rateLimiter.attempt() shouldEqual false
rateLimiter.attempt() shouldEqual false
}

it("should reasonably rate-limit concurrent threads") {
val nThreads = 100
val period = Duration(1, TimeUnit.SECONDS)
val nPeriods = 5

val rateLimiter = new RateLimiter(period)
val pool = Executors.newFixedThreadPool(nThreads)

// All threads will try to increment the time-appropriate counter.
// At the end of the test, there should be at least one counted per period,
// and no single counter should exceed the count of threads (i.e. at least one thread
// was paused long enough that it updated the RateLimiter's internal timestamp
// to something in the previous period.
val periodCounters = (0 until nPeriods).map(_ => new AtomicInteger(0))

// Prep the runnable (some of these variables are updated later).
var startMillis = -1L
var isStarted = false
var isShutdown = false
val runnable: Runnable = () => {
while (!isStarted) {
Thread.sleep(500)
}
while (!isShutdown) {
if (rateLimiter.attempt()) {
val iPeriod = (System.currentTimeMillis() - startMillis) / period.toMillis
periodCounters(iPeriod.toInt).incrementAndGet()
}
}
}

// Kick off the threads and start the test.
for (i <- 0 until nThreads) {
pool.submit(runnable)
}
startMillis = System.currentTimeMillis()
isStarted = true

// Wait for all periods to elapse.
Thread.sleep(nPeriods * period.toMillis)

// Shutdown and wait for everything to finish.
isShutdown = true
pool.shutdown()
while (!pool.isTerminated) {
Thread.sleep(1000)
}

periodCounters.forall(_.get() > 0) shouldEqual true
periodCounters.map(_.get()).max <= nThreads

// Typical local "println(periodCounters)" output:
// Vector(1, 34, 1, 1, 1)
// Vector(1, 20, 1, 1, 1)
// Vector(1, 13, 1, 2, 1)
}
}

0 comments on commit 0764fa6

Please sign in to comment.