Skip to content

Commit

Permalink
Add step.invoke (#53)
Browse files Browse the repository at this point in the history
* WIP invoke

* Improvements

* Add fn

* Update function slug

---------

Co-authored-by: Tony Holdstock-Brown <[email protected]>
  • Loading branch information
djfarrelly and tonyhb authored Jul 17, 2024
1 parent cee9c49 commit 8df627e
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fun Application.module() {
val inngest = Inngest(appId = "ktor-dev")

routing {
serve("/api/inngest", inngest, listOf(UserSignupFunction(), FollowupFunction()))
serve("/api/inngest", inngest, listOf(ProcessAlbum(), RestoreFromGlacier()))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.inngest.testserver

import com.inngest.*
import java.time.Duration

@FunctionConfig(id = "ProcessAlbum", name = "ProcessAlbum")
@FunctionEventTrigger(event = "delivery/process.requested")
class ProcessAlbum : InngestFunction() {
override fun execute(
ctx: FunctionContext,
step: Step,
): LinkedHashMap<String, Any> {

// NOTE - App ID is set on the serve level
val res = step.invoke<Map<String, Any>>(
"restore-album",
"ktor-dev",
"RestoreFromGlacier",
mapOf("some-arg" to "awesome"),
null,

)

// throw NonRetriableError("Could not restore")
return linkedMapOf("hello" to true)
}

fun isRestoredFromGlacier(temp: Int): Boolean {
if (temp > 2) {
return true
}
return false;
}

fun restoreFromGlacier(): String {
return "FILES_RESTORED"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.inngest.testserver

import com.inngest.*
import java.time.Duration

@FunctionConfig(id = "RestoreFromGlacier", name = "RestoreFromGlacier")
@FunctionEventTrigger(event = "delivery/restore.requested")
class RestoreFromGlacier : InngestFunction() {
override fun execute(
ctx: FunctionContext,
step: Step,
): LinkedHashMap<String, Any> {

step.run("restore") {
if (!isRestoredFromGlacier(0)) {
restoreFromGlacier()
}
}
var i = 0
while (i < 6) {
val isRestored = step.run(String.format("check-status-%d", i)) {
isRestoredFromGlacier(i)
}
if (isRestored) {
return linkedMapOf("restored" to true)
}
step.sleep(String.format("wait-for-restore-%d", i), Duration.ofSeconds(5))
i++
}

// throw NonRetriableError("Could not restore")
return linkedMapOf("restored" to false)
}

fun isRestoredFromGlacier(temp: Int): Boolean {
if (temp > 2) {
return true
}
return false;
}

fun restoreFromGlacier(): String {
return "FILES_RESTORED"
}
}
2 changes: 1 addition & 1 deletion inngest/src/main/kotlin/com/inngest/Comm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class CommHandler(

private fun getFunctionConfigs(): List<InternalFunctionConfig> {
val configs: MutableList<InternalFunctionConfig> = mutableListOf()
functions.forEach { entry -> configs.add(entry.value.getFunctionConfig(getServeUrl())) }
functions.forEach { entry -> configs.add(entry.value.getFunctionConfig(getServeUrl(), client)) }
return configs
}

Expand Down
90 changes: 57 additions & 33 deletions inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ internal data class InternalFunctionOptions(
)

internal data class InternalFunctionTrigger
@JvmOverloads
constructor(
@Json(serializeNull = false) val event: String? = null,
@Json(serializeNull = false) val `if`: String? = null,
@Json(serializeNull = false) val cron: String? = null,
)
@JvmOverloads
constructor(
@Json(serializeNull = false) val event: String? = null,
@Json(serializeNull = false) val `if`: String? = null,
@Json(serializeNull = false) val cron: String? = null,
)

// TODO - Add an abstraction layer between the Function call response and the comm handler response
enum class OpCode {
Expand All @@ -25,6 +25,7 @@ enum class OpCode {
StepStateFailed, // TODO
Step,
WaitForEvent,
InvokeFunction,

// FUTURE:
StepNotFound,
Expand Down Expand Up @@ -61,6 +62,14 @@ data class StepOptions(
val opts: Map<String, String>?,
) : StepOp(id, name, op, statusCode)

data class StepOptionsInvoke(
override val id: String,
override val name: String,
override val op: OpCode,
override val statusCode: ResultStatusCode,
val opts: Map<String, Any>?,
) : StepOp(id, name, op, statusCode)

data class StepConfig(
val id: String,
val name: String,
Expand Down Expand Up @@ -154,13 +163,13 @@ internal open class InternalInngestFunction(
op = OpCode.WaitForEvent,
statusCode = ResultStatusCode.StepComplete,
opts =
buildMap {
put("event", e.waitEvent)
put("timeout", e.timeout)
if (e.ifExpression != null) {
put("if", e.ifExpression)
}
},
buildMap {
put("event", e.waitEvent)
put("timeout", e.timeout)
if (e.ifExpression != null) {
put("if", e.ifExpression)
}
},
)
} catch (e: StepInterruptSleepException) {
return StepOptions(
Expand All @@ -170,6 +179,21 @@ internal open class InternalInngestFunction(
op = OpCode.Sleep,
statusCode = ResultStatusCode.StepComplete,
)
} catch (e: StepInterruptInvokeException) {
val functionId = String.format("%s-%s", e.appId, e.fnId)
return StepOptionsInvoke(
id = e.hashedId,
name = e.id,
op = OpCode.InvokeFunction,
statusCode = ResultStatusCode.StepComplete,
opts = buildMap {
put("function_id", functionId)
put("payload", mapOf("data" to e.data))
if (e.timeout != null) {
put("timeout", e.timeout)
}
}
)
} catch (e: StepInterruptException) {
// NOTE - Currently this error could be caught in the user's own function
// that wraps a
Expand All @@ -193,33 +217,33 @@ internal open class InternalInngestFunction(
}
}

fun getFunctionConfig(serveUrl: String): InternalFunctionConfig {
fun getFunctionConfig(serveUrl: String, client: Inngest): InternalFunctionConfig {
// TODO use URL objects instead of strings so we can fetch things like scheme
val scheme = serveUrl.split("://")[0]
return InternalFunctionConfig(
id = config.id,
id = String.format("%s-%s", client.appId, config.id),
name = config.name,
triggers = config.triggers,
steps =
mapOf(
"step" to
StepConfig(
id = "step",
name = "step",
retries =
mapOf(
// TODO - Pull from FunctionOptions
"attempts" to 3,
),
runtime =
hashMapOf(
"type" to scheme,
// TODO - Create correct URL
"url" to
"$serveUrl?fnId=${config.id}&stepId=step",
),
mapOf(
"step" to
StepConfig(
id = "step",
name = "step",
retries =
mapOf(
// TODO - Pull from FunctionOptions
"attempts" to 3,
),
runtime =
hashMapOf(
"type" to scheme,
// TODO - Create correct URL
"url" to
"$serveUrl?fnId=${config.id}&stepId=step",
),
),
),
),
)
}
}
42 changes: 42 additions & 0 deletions inngest/src/main/kotlin/com/inngest/Step.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class StepInterruptSleepException(id: String, hashedId: String, override val dat
class StepInterruptSendEventException(id: String, hashedId: String, val eventIds: Array<String>) :
StepInterruptException(id, hashedId, eventIds)


class StepInterruptInvokeException(id: String, hashedId: String, val appId: String, val fnId: String, data: kotlin.Any?, val timeout: String?) :
StepInterruptException(id, hashedId, data)

class StepInterruptWaitForEventException(
id: String,
hashedId: String,
Expand Down Expand Up @@ -69,6 +73,44 @@ class Step(val state: State, val client: Inngest) {
throw Exception("step state incorrect type")
}

/**
* Invoke another Inngest function as a step
*
* @param id unique step id for memoization
* @param fn ID of the function to invoke
* @param data the data to pass within `event.data` to the function
* @param timeout an optional timeout for the invoked function. If the invoked function does
* not finish within this time, the invoked function will be marked as failed.
*/
inline fun <reified T> invoke(
id: String,
appId: String,
fnId: String,
data: kotlin.Any?,
timeout: String?,
): T = invoke(id, appId, fnId, data, timeout, T::class.java)

fun <T> invoke(
id: String,
appId: String,
fnId: String,
data: kotlin.Any?,
timeout: String?,
type: Class<T>,
): T {
val hashedId = state.getHashFromId(id)
try {
val stepResult = state.getState(hashedId, type)
if (stepResult != null) {
return stepResult
}
} catch (e: StateNotFound) {
throw StepInterruptInvokeException(id, hashedId, appId, fnId, data, timeout)
}
// TODO - handle invalidly stored step types properly
throw Exception("step state incorrect type")
}

/**
* Sleep for a specific duration
*
Expand Down

0 comments on commit 8df627e

Please sign in to comment.