Skip to content

Commit

Permalink
feat(batching): v6 check if execution was exhausted when there are er…
Browse files Browse the repository at this point in the history
…rors (#2011)

### 📝 Description
#2009

Co-authored-by: Samuel Vazquez <[email protected]>
  • Loading branch information
samuelAndalon and Samuel Vazquez authored Jul 11, 2024
1 parent a1cdc0f commit 9eabdde
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentation : AbstractSyncExecutionExh
parameters: SyncExecutionExhaustedInstrumentationParameters
): OnSyncExecutionExhaustedCallback = { _: List<ExecutionId> ->
parameters
.executionContext.executionInput
.executionInput
.dataLoaderRegistry
.dispatchAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
): InstrumentationContext<ExecutionResult>? =
parameters.graphQLContext
?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
?.beginExecution(parameters)
?.beginExecution(
parameters,
this.getOnSyncExecutionExhaustedCallback(
SyncExecutionExhaustedInstrumentationParameters(parameters.executionInput)
)
)

override fun beginExecutionStrategy(
parameters: InstrumentationExecutionStrategyParameters,
Expand All @@ -78,7 +83,7 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
?.beginFieldFetch(
parameters,
this.getOnSyncExecutionExhaustedCallback(
SyncExecutionExhaustedInstrumentationParameters(parameters.executionContext)
SyncExecutionExhaustedInstrumentationParameters(parameters.executionContext.executionInput)
)
)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2024 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,11 @@
package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execution

import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.DataLoaderSyncExecutionExhaustedInstrumentation
import graphql.execution.ExecutionContext
import graphql.ExecutionInput

/**
* Hold information that will be provided to an instance of [DataLoaderSyncExecutionExhaustedInstrumentation]
*/
data class SyncExecutionExhaustedInstrumentationParameters(
val executionContext: ExecutionContext
val executionInput: ExecutionInput
)
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,30 @@ class SyncExecutionExhaustedState(
* @return a non null [InstrumentationContext] object
*/
fun beginExecution(
parameters: InstrumentationExecutionParameters
parameters: InstrumentationExecutionParameters,
onSyncExecutionExhausted: OnSyncExecutionExhaustedCallback
): InstrumentationContext<ExecutionResult> {
executions.computeIfAbsent(parameters.executionInput.executionId) {
ExecutionBatchState()
}
return object : SimpleInstrumentationContext<ExecutionResult>() {
/**
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for starting an execution,
* for example:
* - parsing, validation errors
* - persisted query errors
* - an exception during execution was thrown
*/
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
if ((result != null && result.errors.size > 0) || t != null) {
removeExecution(parameters.executionInput.executionId)
if (executions.containsKey(parameters.executionInput.executionId)) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2024 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -197,11 +197,22 @@ object AstronautGraphQL {
)
)

fun execute(
fun executeOperations(
graphQL: GraphQL,
queries: List<String>,
dataLoaderInstrumentationStrategy: DataLoaderInstrumentationStrategy
): Pair<List<ExecutionResult>, KotlinDataLoaderRegistry> {
): Pair<List<Result<ExecutionResult>>, KotlinDataLoaderRegistry> =
execute(
graphQL,
queries.map { query -> ExecutionInput.newExecutionInput(query).build() },
dataLoaderInstrumentationStrategy
)

fun execute(
graphQL: GraphQL,
executionInputs: List<ExecutionInput>,
dataLoaderInstrumentationStrategy: DataLoaderInstrumentationStrategy
): Pair<List<Result<ExecutionResult>>, KotlinDataLoaderRegistry> {
val kotlinDataLoaderRegistry = spyk(
KotlinDataLoaderRegistryFactory(
AstronautDataLoader(),
Expand All @@ -214,26 +225,33 @@ object AstronautGraphQL {
when (dataLoaderInstrumentationStrategy) {
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION ->
SyncExecutionExhaustedState::class to SyncExecutionExhaustedState(
queries.size,
executionInputs.size,
kotlinDataLoaderRegistry
)
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED ->
ExecutionLevelDispatchedState::class to ExecutionLevelDispatchedState(
queries.size
executionInputs.size
)
}
)

val results = runBlocking {
queries.map { query ->
executionInputs.map { executionInput ->
async {
graphQL.executeAsync(
ExecutionInput
.newExecutionInput(query)
.dataLoaderRegistry(kotlinDataLoaderRegistry)
.graphQLContext(graphQLContext)
.build()
).await()
try {
Result.success(
graphQL.executeAsync(
executionInput.transform { builder ->
builder
.dataLoaderRegistry(kotlinDataLoaderRegistry)
.graphQLContext(graphQLContext)
.build()
}
).await()
)
} catch (e: Exception) {
Result.failure(e)
}
}
}.awaitAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down Expand Up @@ -79,7 +79,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ nasa { mission(id: 4) { id designation } } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down Expand Up @@ -114,7 +114,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down Expand Up @@ -149,7 +149,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"""mutation { createAstronaut(name: "spaceMan") { id name } }"""
)

val (results, _) = AstronautGraphQL.execute(
val (results, _) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand All @@ -170,7 +170,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down
Loading

0 comments on commit 9eabdde

Please sign in to comment.