From 963886eeadd24f35af9d3f1ec063fe988031ca91 Mon Sep 17 00:00:00 2001 From: Albert Chae Date: Thu, 5 Sep 2024 23:00:08 -0700 Subject: [PATCH 1/4] Fix hashing step ids in loops Per https://github.com/inngest/inngest/blob/main/docs/SDK_SPEC.md#512-ids-and-hashing, add `:n` starting with `:1` for repeated instances of a step id Mostly similar to inngest-js implementation https://github.com/inngest/inngest-js/blob/79069e1a3d700624ce49b323922c113fc952bcc6/packages/inngest/src/components/execution/v1.ts#L819-L831 The inngest-js SDK currently optionally warns of parallel indexing, but this isn't in scope for beta so I left it out --- .../testfunctions/LoopFunction.java | 32 +++++++++++++++++++ .../springbootdemo/DemoTestConfiguration.java | 1 + .../LoopFunctionIntegrationTest.java | 30 +++++++++++++++++ inngest/src/main/kotlin/com/inngest/State.kt | 25 ++++++++++++++- 4 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java create mode 100644 inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java new file mode 100644 index 00000000..cba06cd4 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java @@ -0,0 +1,32 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.FunctionContext; +import com.inngest.InngestFunction; +import com.inngest.InngestFunctionConfigBuilder; +import com.inngest.Step; +import org.jetbrains.annotations.NotNull; + +public class LoopFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("loop-fn") + .name("Loop Function") + .triggerEvent("test/loop"); + } + + + @Override + public Integer execute(FunctionContext ctx, Step step) { + int runningCount = 10; + for (int i = 0; i < 5; i++) { + int effectivelyFinalVariableForLambda = runningCount; + runningCount = step.run("add-ten", () -> effectivelyFinalVariableForLambda + 10, Integer.class); + } + + return runningCount; + } +} + diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index cff6a8c3..e5276541 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -30,6 +30,7 @@ protected HashMap functions() { addInngestFunction(functions, new Scale2DObjectFunction()); addInngestFunction(functions, new MultiplyMatrixFunction()); addInngestFunction(functions, new WithOnFailureFunction()); + addInngestFunction(functions, new LoopFunction()); return functions; } diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java new file mode 100644 index 00000000..39b40450 --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java @@ -0,0 +1,30 @@ +package com.inngest.springbootdemo; + +import com.inngest.Inngest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class LoopFunctionIntegrationTest { + @Autowired + private DevServerComponent devServer; + + @Autowired + private Inngest client; + + @Test + void testStepsInLoopExecuteCorrectly() throws Exception { + String loopEvent = InngestFunctionTestHelpers.sendEvent(client, "test/loop").getIds()[0]; + Thread.sleep(2000); + + RunEntry loopRun = devServer.runsByEvent(loopEvent).first(); + assertEquals("Completed", loopRun.getStatus()); + + assertEquals(60, loopRun.getOutput()); + } +} diff --git a/inngest/src/main/kotlin/com/inngest/State.kt b/inngest/src/main/kotlin/com/inngest/State.kt index 50e98942..34502e9e 100644 --- a/inngest/src/main/kotlin/com/inngest/State.kt +++ b/inngest/src/main/kotlin/com/inngest/State.kt @@ -9,8 +9,17 @@ class StateNotFound : Throwable("State not found for id") class State( private val payloadJson: String, ) { + private val stepIds = mutableSetOf() + fun getHashFromId(id: String): String { - val bytes = id.toByteArray(Charsets.UTF_8) + val idToHash: String = + if (id in stepIds) { + findNextAvailableStepId(id) + } else { + id + } + stepIds.add(idToHash) + val bytes = idToHash.toByteArray(Charsets.UTF_8) val digest = MessageDigest.getInstance("SHA-1") val hashedBytes = digest.digest(bytes) val sb = StringBuilder() @@ -20,6 +29,20 @@ class State( return sb.toString() } + private fun findNextAvailableStepId(id: String): String { + var stepNumber = 1 + var possibleStepId: String + while (true) { + possibleStepId = "$id:$stepNumber" + if (possibleStepId !in stepIds) { + break + } + stepNumber++ + } + + return possibleStepId + } + inline fun getState( hashedId: String, fieldName: String = "data", From 303e9845a5100fd7aacc26f6f02e571c96493b35 Mon Sep 17 00:00:00 2001 From: Albert Chae Date: Fri, 6 Sep 2024 07:17:24 -0700 Subject: [PATCH 2/4] Use hash of stepId counts instead of looping --- inngest/src/main/kotlin/com/inngest/State.kt | 27 ++++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/inngest/src/main/kotlin/com/inngest/State.kt b/inngest/src/main/kotlin/com/inngest/State.kt index 34502e9e..72279430 100644 --- a/inngest/src/main/kotlin/com/inngest/State.kt +++ b/inngest/src/main/kotlin/com/inngest/State.kt @@ -9,16 +9,10 @@ class StateNotFound : Throwable("State not found for id") class State( private val payloadJson: String, ) { - private val stepIds = mutableSetOf() + private val stepIdsToSeenCount = mutableMapOf() fun getHashFromId(id: String): String { - val idToHash: String = - if (id in stepIds) { - findNextAvailableStepId(id) - } else { - id - } - stepIds.add(idToHash) + val idToHash: String = findNextAvailableStepId(id) val bytes = idToHash.toByteArray(Charsets.UTF_8) val digest = MessageDigest.getInstance("SHA-1") val hashedBytes = digest.digest(bytes) @@ -30,17 +24,16 @@ class State( } private fun findNextAvailableStepId(id: String): String { - var stepNumber = 1 - var possibleStepId: String - while (true) { - possibleStepId = "$id:$stepNumber" - if (possibleStepId !in stepIds) { - break - } - stepNumber++ + if (id !in stepIdsToSeenCount) { + stepIdsToSeenCount[id] = 1 + return id } - return possibleStepId + // use the seen count so far for current step, increment for next time + val stepNumber = stepIdsToSeenCount[id] + stepIdsToSeenCount[id] = stepIdsToSeenCount.getValue(id) + 1 + + return "$id:$stepNumber" } inline fun getState( From 43373c864a0e49a558ea476d1804eae97bb68bc1 Mon Sep 17 00:00:00 2001 From: Albert Chae Date: Wed, 11 Sep 2024 21:23:43 -0700 Subject: [PATCH 3/4] Use combination of hash and loop to find next unused stepId The hash means we don't have to loop from 0 every time and for most cases will just correctly return us the next unused stepId, but looping afterwards guarantees we don't collide with a user defined stepId. So this will be O(1) in most cases and potentially O(n) for pathological functions that have many steps manually named with the `:n` suffix --- .../testfunctions/LoopFunction.java | 8 +++++-- .../LoopFunctionIntegrationTest.java | 2 +- inngest/src/main/kotlin/com/inngest/State.kt | 22 ++++++++++++++----- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java index cba06cd4..34e0a1df 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java @@ -21,9 +21,13 @@ public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) @Override public Integer execute(FunctionContext ctx, Step step) { int runningCount = 10; + + int effectivelyFinalVariableForLambda1 = runningCount; + runningCount = step.run("add-num:3", () -> effectivelyFinalVariableForLambda1 + 50, Integer.class); + for (int i = 0; i < 5; i++) { - int effectivelyFinalVariableForLambda = runningCount; - runningCount = step.run("add-ten", () -> effectivelyFinalVariableForLambda + 10, Integer.class); + int effectivelyFinalVariableForLambda2 = runningCount; + runningCount = step.run("add-num", () -> effectivelyFinalVariableForLambda2 + 10, Integer.class); } return runningCount; diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java index 39b40450..9e819939 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java @@ -25,6 +25,6 @@ void testStepsInLoopExecuteCorrectly() throws Exception { RunEntry loopRun = devServer.runsByEvent(loopEvent).first(); assertEquals("Completed", loopRun.getStatus()); - assertEquals(60, loopRun.getOutput()); + assertEquals(110, loopRun.getOutput()); } } diff --git a/inngest/src/main/kotlin/com/inngest/State.kt b/inngest/src/main/kotlin/com/inngest/State.kt index 72279430..67e08baa 100644 --- a/inngest/src/main/kotlin/com/inngest/State.kt +++ b/inngest/src/main/kotlin/com/inngest/State.kt @@ -9,10 +9,13 @@ class StateNotFound : Throwable("State not found for id") class State( private val payloadJson: String, ) { - private val stepIdsToSeenCount = mutableMapOf() + private val stepIdsToNextStepNumber = mutableMapOf() + private val stepIds = mutableSetOf() fun getHashFromId(id: String): String { val idToHash: String = findNextAvailableStepId(id) + stepIds.add(idToHash) + val bytes = idToHash.toByteArray(Charsets.UTF_8) val digest = MessageDigest.getInstance("SHA-1") val hashedBytes = digest.digest(bytes) @@ -24,14 +27,21 @@ class State( } private fun findNextAvailableStepId(id: String): String { - if (id !in stepIdsToSeenCount) { - stepIdsToSeenCount[id] = 1 + if (id !in stepIdsToNextStepNumber) { + stepIdsToNextStepNumber[id] = 1 return id } - // use the seen count so far for current step, increment for next time - val stepNumber = stepIdsToSeenCount[id] - stepIdsToSeenCount[id] = stepIdsToSeenCount.getValue(id) + 1 + // start with the seen count so far for current stepId + // but loop over all seen stepIds to make sure a user didn't explicitly define + // a step using the same step number + var stepNumber = stepIdsToNextStepNumber[id] + while ("$id:$stepNumber" in stepIds) { + stepNumber = stepNumber!! + 1 + } + // now we know stepNumber is unused and can be used for the current stepId + // save stepNumber + 1 to the hash for next time + stepIdsToNextStepNumber[id] = stepNumber!! + 1 return "$id:$stepNumber" } From 1fd5308a48f62c6bfa8c522c9f3205236459554b Mon Sep 17 00:00:00 2001 From: Albert Chae Date: Thu, 12 Sep 2024 06:21:01 -0700 Subject: [PATCH 4/4] Handle edge case where user reuses step with stepNumber after it was already allocated in loop --- .../springbootdemo/testfunctions/LoopFunction.java | 9 +++++++++ .../springbootdemo/LoopFunctionIntegrationTest.java | 2 +- inngest/src/main/kotlin/com/inngest/State.kt | 9 ++++----- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java index 34e0a1df..526e067c 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/LoopFunction.java @@ -22,14 +22,23 @@ public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) public Integer execute(FunctionContext ctx, Step step) { int runningCount = 10; + // explicitly naming a step that the SDK will try to use in the loop shouldn't break the loop int effectivelyFinalVariableForLambda1 = runningCount; runningCount = step.run("add-num:3", () -> effectivelyFinalVariableForLambda1 + 50, Integer.class); for (int i = 0; i < 5; i++) { int effectivelyFinalVariableForLambda2 = runningCount; + // The actual stepIds used will be add-num, add-num:1, add-num:2, add-num:4, add-num:5 runningCount = step.run("add-num", () -> effectivelyFinalVariableForLambda2 + 10, Integer.class); } + // explicitly reusing step names that the SDK used during the loop should both execute + // These will be modified to add-num:4:1 and add-num:4:2 respectively + int effectivelyFinalVariableForLambda3 = runningCount; + runningCount = step.run("add-num:4", () -> effectivelyFinalVariableForLambda3 + 30, Integer.class); + int effectivelyFinalVariableForLambda4 = runningCount; + runningCount = step.run("add-num:4", () -> effectivelyFinalVariableForLambda4 + 30, Integer.class); + return runningCount; } } diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java index 9e819939..40967527 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/LoopFunctionIntegrationTest.java @@ -25,6 +25,6 @@ void testStepsInLoopExecuteCorrectly() throws Exception { RunEntry loopRun = devServer.runsByEvent(loopEvent).first(); assertEquals("Completed", loopRun.getStatus()); - assertEquals(110, loopRun.getOutput()); + assertEquals(170, loopRun.getOutput()); } } diff --git a/inngest/src/main/kotlin/com/inngest/State.kt b/inngest/src/main/kotlin/com/inngest/State.kt index 67e08baa..fd1d35b3 100644 --- a/inngest/src/main/kotlin/com/inngest/State.kt +++ b/inngest/src/main/kotlin/com/inngest/State.kt @@ -27,21 +27,20 @@ class State( } private fun findNextAvailableStepId(id: String): String { - if (id !in stepIdsToNextStepNumber) { - stepIdsToNextStepNumber[id] = 1 + if (id !in stepIds) { return id } // start with the seen count so far for current stepId // but loop over all seen stepIds to make sure a user didn't explicitly define // a step using the same step number - var stepNumber = stepIdsToNextStepNumber[id] + var stepNumber = stepIdsToNextStepNumber.getOrDefault(id, 1) while ("$id:$stepNumber" in stepIds) { - stepNumber = stepNumber!! + 1 + stepNumber = stepNumber + 1 } // now we know stepNumber is unused and can be used for the current stepId // save stepNumber + 1 to the hash for next time - stepIdsToNextStepNumber[id] = stepNumber!! + 1 + stepIdsToNextStepNumber[id] = stepNumber + 1 return "$id:$stepNumber" }