diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/ThrottledFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/ThrottledFunction.java new file mode 100644 index 00000000..f21bd957 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/ThrottledFunction.java @@ -0,0 +1,28 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.FunctionContext; +import com.inngest.InngestFunction; +import com.inngest.InngestFunctionConfigBuilder; +import com.inngest.Step; +import org.jetbrains.annotations.NotNull; + +import java.time.Duration; + +public class ThrottledFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("ThrottledFunction") + .name("Throttled Function") + .triggerEvent("test/throttled") + .throttle(1, Duration.ofSeconds(10), 1, "throttled"); + } + + @Override + public Integer execute(FunctionContext ctx, Step step) { + return step.run("result", () -> 42, Integer.class); + } +} + diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index 0e576eb9..09db74af 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -20,6 +20,7 @@ protected HashMap functions() { addInngestFunction(functions, new SendEventFunction()); addInngestFunction(functions, new NonRetriableErrorFunction()); addInngestFunction(functions, new RetriableErrorFunction()); + addInngestFunction(functions, new ThrottledFunction()); return functions; } diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/ThrottleFunctionIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/ThrottleFunctionIntegrationTest.java new file mode 100644 index 00000000..1bbba557 --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/ThrottleFunctionIntegrationTest.java @@ -0,0 +1,39 @@ +package com.inngest.springbootdemo; + +import com.inngest.Inngest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class ThrottleFunctionIntegrationTest { + @Autowired + private DevServerComponent devServer; + + @Autowired + private Inngest client; + + @Test + void testThrottledFunctionShouldNotRunConcurrently() throws Exception { + String firstEvent = InngestFunctionTestHelpers.sendEvent(client, "test/throttled").first(); + Thread.sleep(500); + String secondEvent = InngestFunctionTestHelpers.sendEvent(client, "test/throttled").first(); + + Thread.sleep(5000); + + // Without throttling, both events would have been completed by now + RunEntry firstRun = devServer.runsByEvent(firstEvent).first(); + RunEntry secondRun = devServer.runsByEvent(secondEvent).first(); + assertEquals("Completed", firstRun.getStatus()); + assertEquals("Running", secondRun.getStatus()); + + Thread.sleep(10000); + + RunEntry secondRunAfterWait = devServer.runsByEvent(secondEvent).first(); + assertEquals("Completed", secondRunAfterWait.getStatus()); + } +} diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index f994eb61..c6353eeb 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -76,6 +76,8 @@ internal class InternalFunctionConfig @Json(serializeNull = false) val concurrency: MutableList? = null, @Json(serializeNull = false) + val throttle: Throttle? = null, + @Json(serializeNull = false) val batchEvents: BatchEvents? = null, val steps: Map, ) diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index e3ab9bab..54300980 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -12,6 +12,7 @@ class InngestFunctionConfigBuilder { private var name: String? = null private var triggers: MutableList = mutableListOf() private var concurrency: MutableList? = null + private var throttle: Throttle? = null private var batchEvents: BatchEvents? = null /** @@ -119,6 +120,27 @@ class InngestFunctionConfigBuilder { return this } + /** + * Configure function throttle limit + * + * @param limit The total number of runs allowed to start within the given period. The limit is applied evenly over the period. + * @param period The period of time for the rate limit. Run starts are evenly spaced through the given period. + * The minimum granularity is 1 second. + * @param burst The number of runs allowed to start in the given window in a single burst. + * A burst > 1 bypasses smoothing for the burst and allows many runs to start at once, if desired. Defaults to 1, which disables bursting. + * @param key An optional expression which returns a throttling key for controlling throttling. + * Every unique key is its own throttle limit. Event data may be used within this expression, eg "event.data.user_id". + + */ + + @JvmOverloads + fun throttle( + limit: Int, + period: Duration, + burst: Int? = null, + key: String? = null, + ): InngestFunctionConfigBuilder = apply { this.throttle = Throttle(limit, period, key, burst) } + private fun buildSteps(serveUrl: String): Map { val scheme = serveUrl.split("://")[0] return mapOf( @@ -154,6 +176,7 @@ class InngestFunctionConfigBuilder { name ?: id, triggers, concurrency, + throttle, batchEvents, steps = buildSteps(serverUrl), ) @@ -211,6 +234,18 @@ internal data class Concurrency val scope: ConcurrencyScope? = null, ) +internal data class Throttle + @JvmOverloads + constructor( + val limit: Int, + @KlaxonDuration + val period: Duration, + @Json(serializeNull = false) + val key: String? = null, + @Json(serializeNull = false) + val burst: Int? = 1, + ) + internal data class BatchEvents @JvmOverloads constructor(