Skip to content

Commit

Permalink
refactor: Improve Orchestrator API (#91)
Browse files Browse the repository at this point in the history
* feat: Support TypeReference on Codec

* refactor: Type inferencible in OrchestratorChain

* feat: Rollback chaining

* test: Orchestrator Load Test

* fix: Rollback event가 NUll일때, 예외를 던지지않도록 수정한다

* test: Next orchestrator supports java

* docs: version up 0.3.1 to 0.3.2

* refactor: Undostate를 찾을 수 없으면, 트랜잭션에 참여하지 않은것으로 간주한다

* refactor: remove unused import

* refactor: Make param collections immutable.
  • Loading branch information
devxb authored Mar 21, 2024
1 parent 80e8758 commit d6f3432
Show file tree
Hide file tree
Showing 44 changed files with 1,390 additions and 1,385 deletions.
79 changes: 46 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<br>

![version 0.3.1](https://img.shields.io/badge/version-0.3.1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.3.2](https://img.shields.io/badge/version-0.3.2-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

Saga pattern 으로 구현된 분산 트랜잭션 프레임워크 입니다.
Expand Down Expand Up @@ -61,45 +61,58 @@ class Application {
#### Orchestrator-example.
> [!TIP]
> Orchestrator 사용시, Transactional Message Pattern이 자동 적용됩니다.
> retry 단위는 Orchestrator의 각 연산(하나의 function) 단위이며, 모든 체인이 성공하거나 rollback이 호출됩니다.
> 메시지 유실에대한 retry 단위는 Orchestrator의 각 연산(하나의 function) 단위이며, 모든 체인이 성공하거나 rollback이 호출됩니다.
```kotlin
// Use Orchestrator
@Service
class OrderService(private val orderOrchestrator: Orchestrator<Order>) {

fun order(orderRequest: Order): OrderResult {
val orchestrateResult = orderOrchestrator.transactionSync(orderRequest)
return orchestrateResult.decodeResult(OrderResult::class)
}
class OrderService(private val orderOrchestrator: Orchestrator<Order, OrderResponse>) {

fun order(orderRequest: Order): OrderResult {
val result = orderOrchestrator.transactionSync(orderRequest)
if (!result.isSuccess) {
result.throwError()
}
return result.decodeResult(OrderResult::class)
}
}

// Register Orchestrator
class OrchestratorConfigurer: AbstractOrchestartorConfigurer() {

@Bean // Generic is first request type
fun orderOrchestartor(): Orchestrator<Order> {
return newOrchestrator()
.startSync { orchestrateRequet ->
// Start Transaction with your bussiness logic
// something like ... "check seller"
}
.joinSync(noRollbackFor = OptimisiticLockingFailureException::class) {
// If an exception set in noRollbackFor is thrown during
// business logic, This block retried until success
}
.join {
// Webflux supports, just return Mono.
}
.commitSync {
// The value returned commit chain can be retrieved as follow code.
// result.decodeResult(Something::class)
}
.rollbackSync {
// Rollback Logic to be executed when orchestrator chain fails.
}
.build()
}
@Configurer
class OrchestratorConfigurer(
private val orchestratorFactory: OrchestratorFactory
) {

@Bean
fun orderOrchestartor(): Orchestrator<Order, OrderResponse> { // <First Request, Last Response>
return orchestratorFactory.create("orderOrchestrator")
.start(
function = { order -> // its order type
// Start Transaction with your bussiness logic
// something like ... "Check valid seller"
return@start user
},
rollback = { order ->
// do rollback logic
}
)
.joinReactive(
function = { user -> // Before operations response type "User" flow here
// Webflux supports, should return Mono type.
},
// Can skip rollback operation, if you want
)
.commit(
function = { request ->
// When an error occurs, all rollbacks are called from the bottom up,
// starting from the location where the error occurred.
throw IllegalArgumentException("Oops! Something went wrong..")
},
rollback = { request ->
...
}
)
}
}
```

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftop.netx
version=0.3.1
version=0.3.2
compatibility=17

### Sonarcloud ###
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/org/rooftop/netx/api/Codec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlin.reflect.KClass

interface Codec {

fun <T: Any> encode(data: T): String
fun <T : Any> encode(data: T): String

fun <T : Any> decode(data: String, type: KClass<T>): T
}
2 changes: 2 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ class FailedAckTransactionException(message: String) : RuntimeException(message)

class ResultTimeoutException(message: String, throwable: Throwable) :
RuntimeException(message, throwable)

class ResultException(message: String) : RuntimeException(message)
4 changes: 2 additions & 2 deletions src/main/kotlin/org/rooftop/netx/api/OrchestrateFunction.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.rooftop.netx.api

fun interface OrchestrateFunction<T> {
fun interface OrchestrateFunction<T : Any, V : Any> {

fun orchestrate(orchestrateRequest: OrchestrateRequest): T
fun orchestrate(request: T): V
}
14 changes: 0 additions & 14 deletions src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt

This file was deleted.

10 changes: 5 additions & 5 deletions src/main/kotlin/org/rooftop/netx/api/Orchestrator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package org.rooftop.netx.api

import reactor.core.publisher.Mono

interface Orchestrator<T : Any> {
interface Orchestrator<T : Any, V : Any> {

fun transaction(request: T): Mono<OrchestrateResult>
fun transaction(request: T): Mono<Result<V>>

fun transaction(timeoutMillis: Long, request: T): Mono<OrchestrateResult>
fun transaction(timeoutMillis: Long, request: T): Mono<Result<V>>

fun transactionSync(request: T): OrchestrateResult
fun transactionSync(request: T): Result<V>

fun transactionSync(timeoutMillis: Long, request: T): OrchestrateResult
fun transactionSync(timeoutMillis: Long, request: T): Result<V>
}
66 changes: 66 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/Result.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.rooftop.netx.api

import kotlin.reflect.KClass

class Result<T : Any> private constructor(
val isSuccess: Boolean,
private val codec: Codec,
private val result: String?,
private val error: Error? = null,
) {

fun decodeResult(type: Class<T>): T = decodeResult(type.kotlin)

fun decodeResult(type: KClass<T>): T = result?.let {
codec.decode(it, type)
} ?: throw ResultException("Cannot decode result cause Result is fail state")

fun throwError() = error?.throwError(codec)
?: throw ResultException("Cannot throw error cause Result is success state")

override fun toString(): String {
return "Result(isSuccess=$isSuccess, codec=$codec, result=$result, error=$error)"
}

private class Error(
private val error: String,
private val type: KClass<Throwable>
) {

fun throwError(codec: Codec) {
throw codec.decode(error, type)
}

override fun toString(): String {
return "Error(error='$error', type=$type)"
}
}

internal companion object {

fun <T : Any> success(
codec: Codec,
result: String,
): Result<T> {
return Result(
true,
codec,
result,
null,
)
}

fun <T : Any> fail(
codec: Codec,
error: String,
type: KClass<Throwable>,
): Result<T> {
return Result(
false,
codec,
null,
Error(error, type),
)
}
}
}
6 changes: 6 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/RollbackFunction.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.rooftop.netx.api

fun interface RollbackFunction<T : Any, V : Any?> {

fun rollback(request: T): V
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TransactionRollbackEvent(
nodeName: String,
group: String,
event: String?,
val cause: String?,
val cause: String,
private val undo: String,
private val codec: Codec,
) : TransactionEvent(transactionId, nodeName, group, event, codec) {
Expand Down
117 changes: 117 additions & 0 deletions src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.rooftop.netx.engine

import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.TransactionEvent
import org.rooftop.netx.api.TransactionManager
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import kotlin.reflect.KClass

internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal constructor(
private val orchestratorId: String,
val orchestrateSequence: Int,
private val codec: Codec,
private val transactionManager: TransactionManager,
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
) {

var isFirst: Boolean = true
var isLast: Boolean = true
var isRollbackable: Boolean = false
var beforeRollbackOrchestrateSequence: Int = 0

private var nextOrchestrateListener: AbstractOrchestrateListener<V, Any>? = null
private var nextRollbackOrchestrateListener: AbstractOrchestrateListener<V, Any>? = null
private var castableType: KClass<out T>? = null

@Suppress("UNCHECKED_CAST")
internal fun setNextOrchestrateListener(nextOrchestrateListener: AbstractOrchestrateListener<out Any, out Any>) {
this.nextOrchestrateListener =
nextOrchestrateListener as AbstractOrchestrateListener<V, Any>
}

@Suppress("UNCHECKED_CAST")
internal fun setNextRollbackOrchestrateListener(nextRollbackOrchestrateListener: AbstractOrchestrateListener<out Any, out Any>) {
this.nextRollbackOrchestrateListener =
nextRollbackOrchestrateListener as AbstractOrchestrateListener<V, Any>
}

internal fun setCastableType(type: KClass<out T>) {
castableType = type
}

internal fun Mono<V>.setNextCastableType(): Mono<V> {
return this.doOnNext {
nextOrchestrateListener?.castableType = it::class
nextRollbackOrchestrateListener?.castableType = it::class
}
}

protected fun Mono<*>.getHeldRequest(transactionEvent: TransactionEvent): Mono<T> {
return this.flatMap {
requestHolder.getRequest(
"${transactionEvent.transactionId}:$orchestrateSequence",
getCastableType()
)
}
}

protected fun Mono<T>.holdRequestIfRollbackable(transactionEvent: TransactionEvent): Mono<T> {
return this.flatMap { request ->
if (!isRollbackable) {
Mono.just(request)
}
requestHolder.setRequest(
"${transactionEvent.transactionId}:$orchestrateSequence",
request
)
}
}

protected fun getCastableType(): KClass<out T> {
return castableType
?: throw NullPointerException("OrchestratorId \"$orchestratorId\", OrchestrateSequence \"$orchestrateSequence\"'s CastableType was null")
}

protected fun cast(data: String): T {
return castableType?.let {
codec.decode(data, it)
} ?: throw NullPointerException("Cannot cast \"$data\" cause, castableType is null")
}

protected fun TransactionEvent.toOrchestrateEvent(): Mono<OrchestrateEvent> =
Mono.just(this.decodeEvent(OrchestrateEvent::class))

protected fun <S> Mono<S>.onErrorRollback(
transactionId: String,
orchestrateEvent: OrchestrateEvent,
): Mono<S> = this.onErrorResume {
holdFailResult(transactionId, it)
rollback(transactionId, it, orchestrateEvent)
Mono.empty()
}

private fun holdFailResult(transactionId: String, throwable: Throwable) {
resultHolder.setFailResult(transactionId, throwable)
.subscribeOn(Schedulers.parallel()).subscribe()
}

private fun rollback(
transactionId: String,
throwable: Throwable,
orchestrateEvent: OrchestrateEvent,
) {
transactionManager.rollback(
transactionId = transactionId,
cause = throwable.message ?: throwable.localizedMessage,
event = orchestrateEvent
).subscribeOn(Schedulers.parallel()).subscribe()
}

override fun toString(): String {
return "AbstractOrchestrateListener(orchestratorId='$orchestratorId', orchestrateSequence=$orchestrateSequence, codec=$codec, transactionManager=$transactionManager, requestHolder=$requestHolder, isFirst=$isFirst, isLast=$isLast, isRollbackable=$isRollbackable, beforeRollbackOrchestrateSequence=$beforeRollbackOrchestrateSequence, nextOrchestrateListener=$nextOrchestrateListener, nextRollbackOrchestrateListener=$nextRollbackOrchestrateListener, castableType=$castableType)"
}


}
Loading

0 comments on commit d6f3432

Please sign in to comment.