Skip to content

Commit

Permalink
Implement throttling configuration for functions
Browse files Browse the repository at this point in the history
A test is included, but it may be challenging to verify if the function
is throttled since it returns a status of Running. If the test fails due
to timing issues, it might not be worth keeping.
  • Loading branch information
KiKoS0 committed Aug 31, 2024
1 parent cee727d commit b0c450e
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.FunctionContext;
import com.inngest.InngestFunction;
import com.inngest.InngestFunctionConfigBuilder;
import com.inngest.Step;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;

public class ThrottledFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("ThrottledFunction")
.name("Throttled Function")
.triggerEvent("test/throttled")
.throttle(1, Duration.ofSeconds(10), 1, "throttled");
}

@Override
public Integer execute(FunctionContext ctx, Step step) {
return step.run("result", () -> 42, Integer.class);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new SendEventFunction());
addInngestFunction(functions, new NonRetriableErrorFunction());
addInngestFunction(functions, new RetriableErrorFunction());
addInngestFunction(functions, new ThrottledFunction());

return functions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.inngest.springbootdemo;

import com.inngest.Inngest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import static org.junit.jupiter.api.Assertions.assertEquals;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class ThrottleFunctionIntegrationTest {
@Autowired
private DevServerComponent devServer;

@Autowired
private Inngest client;

@Test
void testThrottledFunctionShouldNotRunConcurrently() throws Exception {
String firstEvent = InngestFunctionTestHelpers.sendEvent(client, "test/throttled").first();
Thread.sleep(500);
String secondEvent = InngestFunctionTestHelpers.sendEvent(client, "test/throttled").first();

Thread.sleep(5000);

// Without throttling, both events would have been completed by now
RunEntry<Object> firstRun = devServer.runsByEvent(firstEvent).first();
RunEntry<Object> secondRun = devServer.runsByEvent(secondEvent).first();
assertEquals("Completed", firstRun.getStatus());
assertEquals("Running", secondRun.getStatus());

Thread.sleep(10000);

RunEntry<Object> secondRunAfterWait = devServer.runsByEvent(secondEvent).first();
assertEquals("Completed", secondRunAfterWait.getStatus());
}
}
2 changes: 2 additions & 0 deletions inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ internal class InternalFunctionConfig
@Json(serializeNull = false)
val concurrency: MutableList<Concurrency>? = null,
@Json(serializeNull = false)
val throttle: Throttle? = null,
@Json(serializeNull = false)
val batchEvents: BatchEvents? = null,
val steps: Map<String, StepConfig>,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class InngestFunctionConfigBuilder {
private var name: String? = null
private var triggers: MutableList<InngestFunctionTrigger> = mutableListOf()
private var concurrency: MutableList<Concurrency>? = null
private var throttle: Throttle? = null
private var batchEvents: BatchEvents? = null

/**
Expand Down Expand Up @@ -119,6 +120,27 @@ class InngestFunctionConfigBuilder {
return this
}

/**
* Configure function throttle limit
*
* @param limit The total number of runs allowed to start within the given period. The limit is applied evenly over the period.
* @param period The period of time for the rate limit. Run starts are evenly spaced through the given period.
* The minimum granularity is 1 second.
* @param burst The number of runs allowed to start in the given window in a single burst.
* A burst > 1 bypasses smoothing for the burst and allows many runs to start at once, if desired. Defaults to 1, which disables bursting.
* @param key An optional expression which returns a throttling key for controlling throttling.
* Every unique key is its own throttle limit. Event data may be used within this expression, eg "event.data.user_id".
*/

@JvmOverloads
fun throttle(
limit: Int,
period: Duration,
burst: Int? = null,
key: String? = null,
): InngestFunctionConfigBuilder = apply { this.throttle = Throttle(limit, period, key, burst) }

private fun buildSteps(serveUrl: String): Map<String, StepConfig> {
val scheme = serveUrl.split("://")[0]
return mapOf(
Expand Down Expand Up @@ -154,6 +176,7 @@ class InngestFunctionConfigBuilder {
name ?: id,
triggers,
concurrency,
throttle,
batchEvents,
steps = buildSteps(serverUrl),
)
Expand Down Expand Up @@ -211,6 +234,18 @@ internal data class Concurrency
val scope: ConcurrencyScope? = null,
)

internal data class Throttle
@JvmOverloads
constructor(
val limit: Int,
@KlaxonDuration
val period: Duration,
@Json(serializeNull = false)
val key: String? = null,
@Json(serializeNull = false)
val burst: Int? = 1,
)

internal data class BatchEvents
@JvmOverloads
constructor(
Expand Down

0 comments on commit b0c450e

Please sign in to comment.