Skip to content

Commit

Permalink
feat(batching): check if execution was exhausted when there are errors (
Browse files Browse the repository at this point in the history
#2009)

### 📝 Description
Cover the scenario where the sync execution is exhausted after an
execution exception, as the execution that thrown the exception could be
the last one to execute synchronous code.

---------

Co-authored-by: Samuel Vazquez <[email protected]>
  • Loading branch information
samuelAndalon and Samuel Vazquez authored Jul 11, 2024
1 parent cf1c9d6 commit 62bcf34
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentation : AbstractSyncExecutionExh
parameters: SyncExecutionExhaustedInstrumentationParameters
): OnSyncExecutionExhaustedCallback = { _: List<ExecutionId> ->
parameters
.executionContext.executionInput
.executionInput
.dataLoaderRegistry
.dispatchAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
): InstrumentationContext<ExecutionResult>? =
parameters.graphQLContext
?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
?.beginExecution(parameters)
?.beginExecution(
parameters,
this.getOnSyncExecutionExhaustedCallback(
SyncExecutionExhaustedInstrumentationParameters(parameters.executionInput)
)
)

override fun beginExecutionStrategy(
parameters: InstrumentationExecutionStrategyParameters,
Expand Down Expand Up @@ -92,7 +97,7 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
?.beginFieldFetching(
parameters,
this.getOnSyncExecutionExhaustedCallback(
SyncExecutionExhaustedInstrumentationParameters(parameters.executionContext)
SyncExecutionExhaustedInstrumentationParameters(parameters.executionContext.executionInput)
)
)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2024 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,11 @@
package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execution

import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.DataLoaderSyncExecutionExhaustedInstrumentation
import graphql.execution.ExecutionContext
import graphql.ExecutionInput

/**
* Hold information that will be provided to an instance of [DataLoaderSyncExecutionExhaustedInstrumentation]
*/
data class SyncExecutionExhaustedInstrumentationParameters(
val executionContext: ExecutionContext
val executionInput: ExecutionInput
)
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,37 @@ class SyncExecutionExhaustedState(
private val totalExecutions: AtomicReference<Int> = AtomicReference(totalOperations)
val executions = ConcurrentHashMap<ExecutionId, ExecutionBatchState>()

/**
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for starting an execution,
* for example:
* - parsing, validation errors
* - persisted query errors
* - an exception during execution was thrown
*/
private fun removeExecution(executionId: ExecutionId) {
if (executions.containsKey(executionId)) {
executions.remove(executionId)
totalExecutions.set(totalExecutions.get() - 1)
}
}

/**
* Create the [ExecutionBatchState] When a specific [ExecutionInput] starts his execution
*
* @param parameters contains information of which [ExecutionInput] will start his execution
* @return a non null [InstrumentationContext] object
*/
fun beginExecution(
parameters: InstrumentationExecutionParameters
parameters: InstrumentationExecutionParameters,
onSyncExecutionExhausted: OnSyncExecutionExhaustedCallback
): InstrumentationContext<ExecutionResult> {
executions.computeIfAbsent(parameters.executionInput.executionId) {
ExecutionBatchState()
}
return object : SimpleInstrumentationContext<ExecutionResult>() {
/**
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for starting or completing execution,
* for example:
* - parsing, validation errors
* - persisted query errors
* - an exception during execution was thrown
*/
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
if ((result != null && result.errors.size > 0) || t != null) {
removeExecution(parameters.executionInput.executionId)
if (executions.containsKey(parameters.executionInput.executionId)) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,22 @@ object AstronautGraphQL {
)
)

fun execute(
fun executeOperations(
graphQL: GraphQL,
queries: List<String>,
dataLoaderInstrumentationStrategy: DataLoaderInstrumentationStrategy
): Pair<List<ExecutionResult>, KotlinDataLoaderRegistry> {
): Pair<List<Result<ExecutionResult>>, KotlinDataLoaderRegistry> =
execute(
graphQL,
queries.map { query -> ExecutionInput.newExecutionInput(query).build() },
dataLoaderInstrumentationStrategy
)

fun execute(
graphQL: GraphQL,
executionInputs: List<ExecutionInput>,
dataLoaderInstrumentationStrategy: DataLoaderInstrumentationStrategy
): Pair<List<Result<ExecutionResult>>, KotlinDataLoaderRegistry> {
val kotlinDataLoaderRegistry = spyk(
KotlinDataLoaderRegistryFactory(
AstronautDataLoader(),
Expand All @@ -210,26 +221,32 @@ object AstronautGraphQL {
when (dataLoaderInstrumentationStrategy) {
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION ->
SyncExecutionExhaustedState::class to SyncExecutionExhaustedState(
queries.size,
executionInputs.size,
kotlinDataLoaderRegistry
)
}
)

val results = runBlocking {
queries.map { query ->
executionInputs.map { executionInput ->
async {
graphQL.executeAsync(
ExecutionInput
.newExecutionInput(query)
.dataLoaderRegistry(kotlinDataLoaderRegistry)
.graphQLContext(graphQLContext)
.build()
).await()
try {
Result.success(
graphQL.executeAsync(
executionInput.transform { builder ->
builder
.dataLoaderRegistry(kotlinDataLoaderRegistry)
.graphQLContext(graphQLContext)
.build()
}
).await()
)
} catch (e: Exception) {
Result.failure(e)
}
}
}.awaitAll()
}

return Pair(results, kotlinDataLoaderRegistry)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion
import com.expediagroup.graphql.dataloader.instrumentation.fixture.DataLoaderInstrumentationStrategy
import com.expediagroup.graphql.dataloader.instrumentation.fixture.AstronautGraphQL
import com.expediagroup.graphql.dataloader.instrumentation.fixture.ProductGraphQL
import graphql.ExecutionInput
import io.mockk.clearAllMocks
import io.mockk.verify
import org.junit.jupiter.api.BeforeEach
Expand Down Expand Up @@ -54,7 +55,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -85,7 +86,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
"{ nasa { mission(id: 4) { id designation } } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -120,7 +121,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -164,7 +165,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -202,7 +203,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -253,7 +254,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -299,7 +300,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -340,7 +341,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand Down Expand Up @@ -380,7 +381,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand All @@ -389,7 +390,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
assertEquals(3, results.size)

results.forEach { result ->
assertTrue(result.errors.isEmpty())
assertTrue(result.getOrThrow().errors.isEmpty())
}

val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics
Expand Down Expand Up @@ -422,15 +423,15 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
)

assertEquals(3, results.size)
results.forEach { result ->
assertTrue(result.errors.isEmpty())
assertTrue(result.getOrThrow().errors.isEmpty())
}

val astronautStatistics = kotlinDataLoaderRegistry.dataLoadersMap["AstronautDataLoader"]?.statistics
Expand Down Expand Up @@ -470,7 +471,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
""".trimIndent(),
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand All @@ -482,7 +483,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {

assertEquals(2, results.size)
results.forEach { result ->
assertTrue(result.errors.isEmpty())
assertTrue(result.getOrThrow().errors.isEmpty())
}

assertEquals(1, astronautStatistics?.batchInvokeCount)
Expand Down Expand Up @@ -566,7 +567,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
"""mutation { createAstronaut(name: "spaceMan") { id name } }"""
)

val (results, dataLoaderSyncExecutionExhaustedInstrumentation) = AstronautGraphQL.execute(
val (results, dataLoaderSyncExecutionExhaustedInstrumentation) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand All @@ -579,15 +580,15 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
}

@Test
fun `Instrumentation should not account for invalid operations`() {
fun `Instrumentation should not consider executions with invalid operations`() {
val queries = listOf(
"invalid query{ astronaut(id: 1) {",
"{ astronaut(id: 2) { id name } }",
"{ mission(id: 3) { id designation } }",
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
Expand All @@ -608,4 +609,35 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
kotlinDataLoaderRegistry.dispatchAll()
}
}

@Test
fun `Instrumentation should not consider executions that thrown exceptions`() {
val executions = listOf(
ExecutionInput.newExecutionInput("query test1 { astronaut(id: 1) { id name } }").operationName("test1").build(),
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("test2").build(),
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("test3").build(),
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
graphQL,
executions,
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION
)

assertEquals(4, results.size)

val astronautStatistics = kotlinDataLoaderRegistry.dataLoadersMap["AstronautDataLoader"]?.statistics
val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics

assertEquals(1, astronautStatistics?.batchInvokeCount)
assertEquals(2, astronautStatistics?.batchLoadCount)

assertEquals(1, missionStatistics?.batchInvokeCount)
assertEquals(1, missionStatistics?.batchLoadCount)

verify(exactly = 2) {
kotlinDataLoaderRegistry.dispatchAll()
}
}
}

0 comments on commit 62bcf34

Please sign in to comment.