Skip to content

Commit

Permalink
Arrow Fx DSL module
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed May 30, 2024
1 parent 1fbd2a8 commit 796e30a
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package arrow.scoped

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext

public suspend fun <A> ScopingScope.resource(
action: suspend CoroutineScope.() -> A,
compensation: suspend (A, Throwable?) -> Unit
): A =
withContext(NonCancellable, action)
.also { a -> closing { e -> compensation(a, e) } }
Original file line number Diff line number Diff line change
@@ -1,8 +1,48 @@
package arrow.scoped

/**
* Saga DSL derived from Scope
* The saga design pattern is a way to manage data consistency across microservices in distributed
* transaction scenarios. A [ScopingScope] is useful when you need to manage data in a consistent manner
* across services in distributed transaction scenarios. Or when you need to compose multiple
* [action] with a [compensation] that needs to run in a transaction like style.
*
* For example, let's say that we have the following domain types `Order`, `Payment`.
*
* ```kotlin
* data class Order(val id: UUID, val amount: Long)
* data class Payment(val id: UUID, val orderId: UUID)
* ```
*
* The creation of an `Order` can only remain when a payment has been made. In SQL, you might run
* this inside a transaction, which can automatically roll back the creation of the `Order` when the
* creation of the Payment fails.
*
* When you need to do this across distributed services, or a multiple atomic references, etc. You
* need to manually facilitate the rolling back of the performed actions, or compensating actions.
*
* The [saga] DSL removes all the boilerplate of manually having to facilitate this
* with a convenient suspending DSL.
*
* ```kotlin
* data class Order(val id: UUID, val amount: Long)
* suspend fun createOrder(): Order = Order(UUID.randomUUID(), 100L)
* suspend fun deleteOrder(order: Order): Unit = println("Deleting $order")
*
* data class Payment(val id: UUID, val orderId: UUID)
* suspend fun createPayment(order: Order): Payment = Payment(UUID.randomUUID(), order.id)
* suspend fun deletePayment(payment: Payment): Unit = println("Deleting $payment")
*
* suspend fun Payment.awaitSuccess(): Unit = throw RuntimeException("Payment Failed")
*
* suspend fun main() = scoped {
* val order = saga({ createOrder() }) { deleteOrder(it) }
* val payment = saga { createPayment(order) }, ::deletePayment)
* payment.awaitSuccess()
* }
* ```
*/
// TODO should this be implemented on ScopingScope
// Or should ScopingScope be used to implement a separate SagaScope??
public suspend fun <A> ScopingScope.saga(
action: suspend () -> A,
compensation: suspend (A, Throwable?) -> Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import arrow.AutoCloseScope
import arrow.throwIfFatal
import arrow.atomic.Atomic
import arrow.atomic.update
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
import kotlin.coroutines.cancellation.CancellationException

public interface ScopingScope : AutoCloseScope {
Expand All @@ -29,9 +31,11 @@ private class DefaultScopingScope : ScopingScope {
private val closeables: Atomic<List<suspend (Throwable?) -> Unit>> = Atomic(emptyList())

suspend fun close(error: Throwable?): Nothing? {
return closeables.get().asReversed().fold(error) { acc, function ->
acc.add(runCatching { function(error) }.exceptionOrNull())
}?.let { throw it }
return withContext(NonCancellable) {
closeables.get().asReversed().fold(error) { acc, function ->
acc.add(runCatching { function(error) }.exceptionOrNull())
}?.let { throw it }
}
}

override fun closing(block: suspend (Throwable?) -> Unit): Unit =
Expand Down

0 comments on commit 796e30a

Please sign in to comment.