Skip to content

Commit

Permalink
Implement live internal latency tracker (#651)
Browse files Browse the repository at this point in the history
* Implement live internal latency tracker

* Rename to accumulator

* Refactor

* Refactor

* Add tracker impl

* Delete pubisher function and add test

* Delete unused imports

* Fix flaky test

* Add test for concurrent external work
  • Loading branch information
gnawf authored Dec 9, 2024
1 parent 0025f15 commit 0d71072
Show file tree
Hide file tree
Showing 15 changed files with 719 additions and 95 deletions.
10 changes: 8 additions & 2 deletions lib/src/main/java/graphql/nadel/Nadel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import graphql.nadel.instrumentation.parameters.NadelInstrumentationQueryExecuti
import graphql.nadel.instrumentation.parameters.NadelInstrumentationQueryValidationParameters
import graphql.nadel.schema.QuerySchemaGenerator
import graphql.nadel.schema.SchemaTransformationHook
import graphql.nadel.time.NadelInternalLatencyTracker
import graphql.nadel.util.getLogger
import graphql.nadel.util.getNotPrivacySafeLogger
import graphql.nadel.validation.NadelSchemaValidationFactory
import graphql.nadel.validation.NadelSchemaValidation
import graphql.nadel.validation.NadelSchemaValidationFactory
import graphql.parser.InvalidSyntaxException
import graphql.parser.Parser
import graphql.schema.GraphQLSchema
Expand Down Expand Up @@ -68,7 +69,10 @@ class Nadel private constructor(
.executionId(nadelExecutionInput.executionId)
.build()

val nadelExecutionParams = NadelExecutionParams(nadelExecutionInput.nadelExecutionHints)
val nadelExecutionParams = NadelExecutionParams(
nadelExecutionInput.executionHints,
nadelExecutionInput.latencyTracker,
)
val instrumentationState = instrumentation.createState(
NadelInstrumentationCreateStateParameters(querySchema, executionInput),
)
Expand Down Expand Up @@ -247,6 +251,8 @@ class Nadel private constructor(

private var nadelValidation: NadelSchemaValidation? = null

private var latencyTracker: NadelInternalLatencyTracker? = null

fun schemas(schemas: NadelSchemas): Builder {
this.schemas = schemas
return this
Expand Down
16 changes: 13 additions & 3 deletions lib/src/main/java/graphql/nadel/NadelExecutionInput.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package graphql.nadel

import graphql.GraphQLContext
import graphql.execution.ExecutionId
import graphql.nadel.time.NadelInternalLatencyTracker
import graphql.nadel.time.NadelInternalLatencyTrackerImpl
import graphql.nadel.time.NadelStopwatch
import java.util.function.Consumer

@Suppress("DataClassPrivateConstructor") // Don't care
data class NadelExecutionInput private constructor(
val query: String,
val operationName: String?,
Expand All @@ -13,7 +15,8 @@ data class NadelExecutionInput private constructor(
val variables: Map<String, Any?>,
val extensions: Map<String, Any?>,
val executionId: ExecutionId?,
val nadelExecutionHints: NadelExecutionHints,
val executionHints: NadelExecutionHints,
val latencyTracker: NadelInternalLatencyTracker,
) {
class Builder {
private var query: String? = null
Expand All @@ -24,6 +27,7 @@ data class NadelExecutionInput private constructor(
private var extensions: Map<String, Any?> = LinkedHashMap()
private var executionId: ExecutionId? = null
private var executionHints = NadelExecutionHints.newHints().build()
private var latencyTracker: NadelInternalLatencyTracker? = null

fun query(query: String): Builder {
this.query = query
Expand Down Expand Up @@ -77,6 +81,11 @@ data class NadelExecutionInput private constructor(
return this
}

fun latencyTracker(latencyTracker: NadelInternalLatencyTracker): Builder {
this.latencyTracker = latencyTracker
return this
}

fun build(): NadelExecutionInput {
return NadelExecutionInput(
query = requireNotNull(query) { "Query must be provided" },
Expand All @@ -86,7 +95,8 @@ data class NadelExecutionInput private constructor(
variables = variables,
extensions = extensions,
executionId = executionId,
nadelExecutionHints = executionHints,
executionHints = executionHints,
latencyTracker = latencyTracker ?: NadelInternalLatencyTrackerImpl(NadelStopwatch()),
)
}
}
Expand Down
7 changes: 6 additions & 1 deletion lib/src/main/java/graphql/nadel/NadelExecutionParams.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
package graphql.nadel

class NadelExecutionParams internal constructor(val nadelExecutionHints: NadelExecutionHints)
import graphql.nadel.time.NadelInternalLatencyTracker

class NadelExecutionParams internal constructor(
val executionHints: NadelExecutionHints,
val latencyTracker: NadelInternalLatencyTracker,
)
10 changes: 7 additions & 3 deletions lib/src/main/java/graphql/nadel/NextgenEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParame
import graphql.nadel.instrumentation.parameters.child
import graphql.nadel.result.NadelResultMerger
import graphql.nadel.result.NadelResultTracker
import graphql.nadel.time.NadelInternalLatencyTracker
import graphql.nadel.util.OperationNameUtil
import graphql.nadel.util.getLogger
import graphql.nadel.validation.NadelSchemaValidation
Expand Down Expand Up @@ -142,7 +143,8 @@ internal class NextgenEngine(
executionInput,
queryDocument,
instrumentationState,
nadelExecutionParams.nadelExecutionHints,
nadelExecutionParams.executionHints,
nadelExecutionParams.latencyTracker,
)
}.asCompletableFuture()
}
Expand All @@ -160,12 +162,14 @@ internal class NextgenEngine(
queryDocument: Document,
instrumentationState: InstrumentationState?,
executionHints: NadelExecutionHints,
latencyTracker: NadelInternalLatencyTracker,
): ExecutionResult = supervisorScope {
try {
val timer = NadelInstrumentationTimer(
instrumentation,
ticker = latencyTracker::getInternalLatency,
instrumentation = instrumentation,
userContext = executionInput.context,
instrumentationState,
instrumentationState = instrumentationState,
)

val operationParseOptions = baseParseOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import graphql.execution.instrumentation.InstrumentationState
import graphql.nadel.instrumentation.NadelInstrumentation
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters.Step
import java.io.Closeable
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference

internal class NadelInstrumentationTimer(
private val ticker: () -> Duration,
private val instrumentation: NadelInstrumentation,
private val userContext: Any?,
private val instrumentationState: InstrumentationState?,
Expand All @@ -16,14 +19,17 @@ internal class NadelInstrumentationTimer(
step: Step,
function: () -> T,
): T {
val start = Instant.now()
val startNs = System.nanoTime()
val start = ticker()

val result = try {
function()
} catch (e: Throwable) {
try {
emit(step, start = start, startNs = startNs, exception = e)
emit(
step = step,
internalLatency = ticker() - start,
exception = e,
)
} catch (e2: Throwable) {
e2.addSuppressed(e)
throw e2
Expand All @@ -32,78 +38,81 @@ internal class NadelInstrumentationTimer(
throw e
}

emit(step, start, startNs = startNs)
emit(
step = step,
internalLatency = ticker() - start,
)

return result
}

fun batch(): BatchTimer {
return BatchTimer(timer = this)
return BatchTimer()
}

inline fun <T> batch(function: (BatchTimer) -> T): T {
return BatchTimer(timer = this).use(function)
}

@Suppress("NOTHING_TO_INLINE") // inline anyway
private inline fun emit(step: Step, start: Instant, startNs: Long, exception: Throwable? = null) {
val endNs = System.nanoTime()
val duration = Duration.ofNanos(endNs - startNs)

instrumentation.onStepTimed(newParameters(step, start, duration, exception))
return BatchTimer().use(function)
}

@Suppress("NOTHING_TO_INLINE") // inline anyway
private inline fun emit(step: Step, duration: Duration, exception: Throwable? = null) {
instrumentation.onStepTimed(newParameters(step, null, duration, exception))
private inline fun emit(
step: Step,
internalLatency: Duration,
exception: Throwable? = null,
) {
instrumentation.onStepTimed(
newParameters(
step = step,
internalLatency = internalLatency,
exception = exception,
),
)
}

private fun newParameters(
step: Step,
startedAt: Instant?,
duration: Duration,
internalLatency: Duration,
exception: Throwable? = null,
): NadelInstrumentationTimingParameters {
return NadelInstrumentationTimingParameters(
step = step,
startedAt = startedAt,
duration = duration,
internalLatency = internalLatency,
exception = exception,
context = userContext,
instrumentationState = instrumentationState,
)
}

class BatchTimer(
private val timer: NadelInstrumentationTimer,
private val timings: MutableMap<Step, Long> = mutableMapOf(),
) {
inner class BatchTimer internal constructor() : Closeable {
private val timings: MutableMap<Step, AtomicReference<Duration>> = ConcurrentHashMap()

private var exception: Throwable? = null

inline fun <T> time(step: Step, function: () -> T): T {
val start = System.nanoTime()
timings.computeIfAbsent(step) {
AtomicReference(Duration.ZERO)
}

val start = ticker()

return try {
function()
} catch (e: Throwable) {
exception = e
throw e
} finally {
val end = System.nanoTime()
timings[step] = (timings[step] ?: 0) + (end - start)
}
}
val end = ticker()

fun submit() {
timings.forEach { (step, durationNs) ->
timer.emit(step, duration = Duration.ofNanos(durationNs), exception)
timings[step]!!.getAndUpdate { current ->
// Just get the max
current.coerceAtLeast(end - start)
}
}
}

inline fun <T> use(function: (timer: BatchTimer) -> T): T {
try {
return function(this)
} finally {
submit()
override fun close() {
timings.forEach { (step, durationNs) ->
emit(step, durationNs.get(), exception)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import graphql.nadel.engine.transform.NadelTransform
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters.ChildStep
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters.Step
import java.time.Duration
import java.time.Instant

data class NadelInstrumentationTimingParameters(
val step: Step,
/**
* Can be null for batched timings which don't really have a start time.
*/
val startedAt: Instant?,
val duration: Duration,
val internalLatency: Duration,
/**
* If an exception occurred during the timing of the step, then it is passed in here.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package graphql.nadel.time

import java.time.Duration

interface NadelInternalLatencyTracker {
/**
* Gets the _current_ internal latency.
*
* This can be invoked before the latency is completely tracked to get a running track
* of latency.
*/
fun getInternalLatency(): Duration
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package graphql.nadel.time

import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Supplier

open class NadelInternalLatencyTrackerImpl(
/**
* Stopwatch to track internal latency.
*/
private val internalLatency: NadelStopwatch,
) : NadelInternalLatencyTracker {
private val outstandingExternalLatencyCount = AtomicInteger()

override fun getInternalLatency(): Duration {
return internalLatency.elapsed()
}

fun onExternalRun(code: Runnable) {
onExternalCallStart()

try {
code.run()
} finally {
onExternalCallEnd()
}
}

fun <T : Any> onExternalGet(code: Supplier<T>): T {
onExternalCallStart()

try {
return code.get()
} finally {
onExternalCallEnd()
}
}

fun <T : Any> onExternalFuture(future: CompletableFuture<T>): CompletableFuture<T> {
onExternalCallStart()

return future
.whenComplete { _, _ ->
onExternalCallEnd()
}
}

fun <T : Any> onExternalFuture(future: Supplier<CompletableFuture<T>>): CompletableFuture<T> {
onExternalCallStart()

return future.get()
.whenComplete { _, _ ->
onExternalCallEnd()
}
}

protected fun onExternalCallStart() {
if (outstandingExternalLatencyCount.getAndIncrement() == 0) {
internalLatency.stop()
}
}

protected fun onExternalCallEnd() {
if (outstandingExternalLatencyCount.decrementAndGet() == 0) {
internalLatency.start()
}
}
}
Loading

0 comments on commit 0d71072

Please sign in to comment.