Skip to content

Commit

Permalink
Refactor HttpClient to its own class (#29)
Browse files Browse the repository at this point in the history
* Refactor HttpClient to its own class

Which makes its headers fully configurable by the Inngest client.
It should also be able to pool network connections correctly given
that it's the same instance. According to:

https://square.github.io/okhttp/contribute/concurrency/#connection-pool

* Add the framework header to client

* Use Inngest client headers in controller

* Move framework option from Inngest's client API

It's now set to the `CommHandler` similarly to the other SDKs.

* Make `HttpClient` internal
  • Loading branch information
KiKoS0 authored Feb 25, 2024
1 parent a9b635b commit c6eccd4
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 88 deletions.
70 changes: 11 additions & 59 deletions inngest-core/src/main/kotlin/com/inngest/Comm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ package com.inngest

import com.beust.klaxon.Json
import com.beust.klaxon.Klaxon
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody
import java.io.IOException

data class ExecutionRequestPayload(
val ctx: ExecutionContext,
Expand Down Expand Up @@ -45,18 +40,12 @@ data class CommError(
val __serialized: Boolean = true,
)

val jsonMediaType = "application/json".toMediaType()

class CommHandler(val functions: Map<String, InngestFunction>, val client: Inngest? = null) {
private fun getHeaders(): Map<String, String> {
return mapOf(
"Content-Type" to "application/json",
// TODO - Get this from the build
"x-inngest-sdk" to "inngest-kt:${Version.getVersion()}",
// TODO - Pull this from options
"x-inngest-framework" to "ktor",
)
}
class CommHandler(
val functions: Map<String, InngestFunction>,
val client: Inngest,
private val framework: SupportedFrameworkName,
) {
val headers = Environment.inngestHeaders(framework).plus(client.headers)

fun callFunction(
functionId: String,
Expand All @@ -78,11 +67,7 @@ class CommHandler(val functions: Map<String, InngestFunction>, val client: Innge
attempt = payload.ctx.attempt,
)

val result =
function.call(
ctx = ctx,
requestBody,
)
val result = function.call(ctx = ctx, client = client, requestBody)
var body: Any? = null
if (result.statusCode == ResultStatusCode.StepComplete || result is StepOptions) {
body = listOf(result)
Expand All @@ -93,7 +78,7 @@ class CommHandler(val functions: Map<String, InngestFunction>, val client: Innge
return CommResponse(
body = Klaxon().toJsonString(body),
statusCode = result.statusCode,
headers = getHeaders(),
headers = headers,
)
} catch (e: Exception) {
val err =
Expand All @@ -105,50 +90,17 @@ class CommHandler(val functions: Map<String, InngestFunction>, val client: Innge
return CommResponse(
body = Klaxon().toJsonString(err),
statusCode = ResultStatusCode.Error,
headers = getHeaders(),
headers = headers,
)
}
}

fun getFunctionConfigs(): List<FunctionConfig> {
private fun getFunctionConfigs(): List<FunctionConfig> {
val configs: MutableList<FunctionConfig> = mutableListOf()
functions.forEach { entry -> configs.add(entry.value.getConfig()) }
return configs
}

companion object Client {
inline fun <reified T> sendEvent(payload: Any): T? {
val eventKey = "test"
return send("http://localhost:8288/e/$eventKey", payload)
}

inline fun <reified T> send(
url: String,
payload: Any,
): T? {
val jsonRequestBody = Klaxon().toJsonString(payload)
val requestBody = jsonRequestBody.toRequestBody(jsonMediaType)

val client = OkHttpClient()

// TODO - Add missing headers
val request =
Request.Builder()
.url(url)
.post(requestBody)
.build()

client.newCall(request).execute().use { response ->
// TODO: Handle error case
if (!response.isSuccessful) throw IOException("Unexpected code $response")
if (Unit::class.java.isAssignableFrom(T::class.java)) {
return Unit as T
}
return Klaxon().parse<T>(response.body!!.charStream())
}
}
}

fun register(): String {
// TODO - This should detect the dev server or use base url
val registrationUrl = "http://localhost:8288/fn/register"
Expand All @@ -162,7 +114,7 @@ class CommHandler(val functions: Map<String, InngestFunction>, val client: Innge
functions = getFunctionConfigs(),
)

send<Unit>(registrationUrl, requestPayload)
client.send<Unit>(registrationUrl, requestPayload)

// TODO - Add headers to output
val body: Map<String, Any?> = mapOf()
Expand Down
13 changes: 13 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/Environment.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.inngest

object Environment {
fun inngestHeaders(framework: SupportedFrameworkName? = null): RequestHeaders {
val sdk = "inngest-kt:${Version.getVersion()}"
return mapOf(
InngestHeaderKey.ContentType.value to "application/json",
InngestHeaderKey.Sdk.value to sdk,
InngestHeaderKey.UserAgent.value to sdk,
InngestHeaderKey.Framework.value to (framework?.value),
).filterValues { (it is String) }.entries.associate { (k, v) -> k to v!! }
}
}
3 changes: 2 additions & 1 deletion inngest-core/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ open class InngestFunction(

fun call(
ctx: FunctionContext,
client: Inngest,
requestBody: String,
): StepOp {
val state = State(requestBody)
val step = Step(state)
val step = Step(state, client)

// DEBUG
println(state)
Expand Down
45 changes: 45 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/HttpClient.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.inngest

import com.beust.klaxon.Klaxon
import okhttp3.Headers
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.Response

typealias RequestHeaders = Map<String, String>

data class RequestConfig(val headers: RequestHeaders? = null)

val jsonMediaType = "application/json".toMediaType()

internal class HttpClient(private val clientConfig: RequestConfig) {
private val client = OkHttpClient()

fun <T> send(
request: okhttp3.Request,
handler: (Response) -> T,
) = this.client.newCall(request).execute().use(handler)

fun build(
url: String,
payload: Any,
config: RequestConfig? = null,
): okhttp3.Request {
val jsonRequestBody = Klaxon().toJsonString(payload)
val body = jsonRequestBody.toRequestBody(jsonMediaType)

return okhttp3.Request.Builder()
.url(url)
.post(body)
.headers(toOkHttpHeaders(clientConfig.headers))
.apply { config?.headers?.forEach { (k, v) -> addHeader(k, v) } }
.build()
}
}

fun toOkHttpHeaders(requestHeaders: RequestHeaders?): Headers {
val builder = Headers.Builder()
requestHeaders?.forEach { (k, v) -> builder.add(k, v) }
return builder.build()
}
38 changes: 26 additions & 12 deletions inngest-core/src/main/kotlin/com/inngest/Inngest.kt
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
package com.inngest

import com.beust.klaxon.Klaxon
import java.io.IOException

class Inngest(val appId: String) {
// TODO - Fetch INNGEST_EVENT_KEY env variable on instantiation

// fun send(event: Event): EventAPIResponse {
// val requestBody = Klaxon().toJsonString(event)
// val client = HttpClient.newBuilder().build()
// val request =
// HttpRequest.newBuilder()
// .uri(URI.create("http://localhost:8288/e/"))
// .POST(HttpRequest.BodyPublishers.ofString(requestBody))
// .build()
// val response = client.send(request, HttpResponse.BodyHandlers.ofString())
// val body = Klaxon().parse<EventAPIResponse>(response)
// return body;
// }
val headers: RequestHeaders = Environment.inngestHeaders()
internal val httpClient = HttpClient(RequestConfig(headers))

internal inline fun <reified T> send(
url: String,
payload: Any,
): T? {
val request = httpClient.build(url, payload)

return httpClient.send(request) lambda@{ response ->
// TODO: Handle error case
if (!response.isSuccessful) throw IOException("Unexpected code $response")
if (Unit::class.java.isAssignableFrom(T::class.java)) {
return@lambda Unit as T
}
return@lambda Klaxon().parse<T>(response.body!!.charStream())
}
}

internal inline fun <reified T> sendEvent(payload: Any): T? {
val eventKey = "test"
return send("http://localhost:8288/e/$eventKey", payload)
}
}
16 changes: 16 additions & 0 deletions inngest-core/src/main/kotlin/com/inngest/InngestHeaderKey.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.inngest

enum class InngestHeaderKey(val value: String) {
ContentType("content-type"),
UserAgent("user-agent"),
Sdk("x-inngest-sdk"),
Framework("x-inngest-framework"),
Environment("x-inngest-env"),
Platform("x-inngest-platform"),
NoRetry("x-inngest-no-retry"),
RequestVersion("x-inngest-req-version"),
RetryAfter("retry-after"),
ServerKind("x-inngest-server-kind"),
ExpectedServerKind("x-inngest-expected-server-kind"),
Signature("x-inngest-signature"),
}
4 changes: 2 additions & 2 deletions inngest-core/src/main/kotlin/com/inngest/Step.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class StepInterruptWaitForEventException(
// TODO: Add name, stack, etc. if poss
class StepError(message: String) : Exception(message)

class Step(val state: State) {
class Step(val state: State, val client: Inngest) {
/**
* Run a function
*
Expand Down Expand Up @@ -130,7 +130,7 @@ class Step(val state: State) {
}
return
} catch (e: StateNotFound) {
val response = CommHandler.sendEvent<SendEventsResponse>(events)
val response = client.sendEvent<SendEventsResponse>(events)
throw StepInterruptSendEventException(id, hashedId, response!!.ids)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.inngest

enum class SupportedFrameworkName(val value: String) {
SpringBoot("springboot"),
Ktor("ktor"),
}
7 changes: 6 additions & 1 deletion inngest-core/src/main/kotlin/com/inngest/ktor/Route.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ fun Route.serve(
fnList: List<InngestFunction>,
) {
val fnMap = fnList.associateBy { it.id() }
val comm = CommHandler(functions = fnMap, client)
val comm =
CommHandler(
functions = fnMap,
client = client,
framework = SupportedFrameworkName.Ktor,
)

route(path) {
get("") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
import com.inngest.CommHandler;
import com.inngest.Inngest;
import com.inngest.InngestFunction;
import com.inngest.SupportedFrameworkName;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

import java.util.HashMap;

public abstract class InngestConfiguration {
private final SupportedFrameworkName frameworkName = SupportedFrameworkName.SpringBoot;

protected abstract HashMap<String, InngestFunction> functions();

@Bean
protected abstract Inngest inngestClient();


@Bean
protected CommHandler commHandler(@Autowired Inngest inngestClient) {
return new CommHandler(functions(), inngestClient);
return new CommHandler(functions(), inngestClient, frameworkName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,33 @@ public abstract class InngestController {
@Autowired
CommHandler commHandler;

private static final HttpHeaders commonHeaders = new HttpHeaders();

static {
String inngestSdk = "inngest-kt:v0.0.1";
commonHeaders.add("x-inngest-sdk", inngestSdk);
private HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders();
commHandler.getHeaders().forEach(headers::add);
return headers;
}

@GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@GetMapping()
public ResponseEntity<String> index() {
String response = commHandler.introspect();
return ResponseEntity.ok().headers(commonHeaders).body(response);
return ResponseEntity.ok().headers(getHeaders()).body(response);
}

@PutMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@PutMapping()
public ResponseEntity<String> put() {
String response = commHandler.register();
return ResponseEntity.ok().headers(commonHeaders).body(response);
return ResponseEntity.ok().headers(getHeaders()).body(response);
}

@PostMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@PostMapping()
public ResponseEntity<String> handleRequest(
@RequestParam(name = "fnId") String functionId,
@RequestBody String body
) {
try {
CommResponse response = commHandler.callFunction(functionId, body);

return ResponseEntity.status(response.getStatusCode().getCode()).headers(commonHeaders)
return ResponseEntity.status(response.getStatusCode().getCode()).headers(getHeaders())
.body(response.getBody());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.inngest.springbootdemo;

import com.inngest.InngestHeaderKey;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
Expand All @@ -21,6 +22,7 @@ public void shouldReturnSyncPayload() throws Exception {
mockMvc.perform(get("/api/inngest"))
.andExpect(status().isOk())
.andExpect(content().contentType("application/json"))
.andExpect(header().string(InngestHeaderKey.Framework.getValue(), "springboot"))
.andExpect(jsonPath("$.appName").value("my-app"))
.andExpect(jsonPath("$.sdk").value("kotlin"));
}
Expand Down

0 comments on commit c6eccd4

Please sign in to comment.