Skip to content

Commit

Permalink
Merge pull request #70 from fraktalio/feature/metadata
Browse files Browse the repository at this point in the history
`Metadata` flavour of Fmodel API
  • Loading branch information
idugalic authored Nov 26, 2023
2 parents e36011d + d6a9fda commit 16c8798
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 34 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ configurations {

repositories {
mavenCentral()
maven("https://s01.oss.sonatype.org/content/repositories/snapshots/")
}

extra["testcontainersVersion"] = "1.19.3"
extra["fmodelVersion"] = "3.5.0"
extra["fmodelVersion"] = "3.5.1-SNAPSHOT"
extra["kotlinxSerializationJson"] = "1.6.1"
extra["kotlinxCollectionsImmutable"] = "0.3.6"
extra["kotlinLogging"] = "3.0.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,19 @@ class AggregateEventRepositoryImpl(
* Fetching the current state as a series/flow of Events
*/
override fun Command?.fetchEvents(): Flow<Pair<Event, UUID?>> =
fetchEventsAndMetaData().map { Pair(it.first, it.second) }

/**
* Fetching the current state as a series/flow of Events and Metadata.
* This method is implemented by Default in the EventRepository interface, but we need/want to override it here.
*/
override fun Command?.fetchEventsAndMetaData(): Flow<Triple<Event, UUID?, Map<String, Any>>> =
when (this) {
is Command -> getEvents(deciderId()).map { it.toEventWithId() }
is Command -> getEvents(deciderId()).map { it.toEventWithIdAndMetaData() }
.map { Triple(it.first, it.second, mapOf("commandId" to it.third as Any)) }

null -> emptyFlow()
}
.onCompletion {
when (it) {
null -> logger.debug { "fetching the aggregate events by command ${this@fetchEvents} completed with success" }
else -> logger.warn { "fetching the aggregate events by command ${this@fetchEvents} completed with exception $it" }
}
}
.flowOn(dbDispatcher)

/**
Expand All @@ -117,54 +120,71 @@ class AggregateEventRepositoryImpl(
*
* `latestVersionProvider` is used to fetch the latest version of the event stream, per need
*/
override fun Flow<Event?>.save(latestVersionProvider: (Event?) -> UUID?): Flow<Pair<Event, UUID>> = flow {
override fun Flow<Event?>.save(latestVersionProvider: (Event?) -> UUID?): Flow<Pair<Event, UUID>> =
saveWithMetaData(latestVersionProvider, emptyMap()).map { Pair(it.first, it.second) }

/**
* Storing the new state as a series/flow of Events and Metadata
* This method is implemented by Default in the EventRepository interface, but we need/want to override it here.
*
* `latestVersionProvider` is used to fetch the latest version of the event stream, per need
*/
override fun Flow<Event?>.saveWithMetaData(
latestVersionProvider: (Event?) -> UUID?,
metaData: Map<String, Any>
): Flow<Triple<Event, UUID, Map<String, Any>>> = flow {
val previousIds: MutableMap<String, UUID?> = emptyMap<String, UUID?>().toMutableMap()
emitAll(
filterNotNull()
.map {
previousIds.computeIfAbsent(it.deciderId()) { _ -> latestVersionProvider(it) }
val eventId = UUID.randomUUID()
val eventEntity = it.toEventEntity(eventId, previousIds[it.deciderId()])
val eventEntity =
it.toEventEntity(eventId, previousIds[it.deciderId()], metaData["commandId"] as UUID)
previousIds[it.deciderId()] = eventId
eventEntity
}
.appendAll()
.map { it.toEventWithId() }
.map { it.toEventWithIdAndMetaData() }
.map { Triple(it.first, it.second, mapOf("commandId" to it.third as Any)) }
)
}
.onCompletion {
when (it) {
null -> logger.debug { "saving new events completed successfully" }
else -> logger.warn { "saving new events completed with exception $it" }
}
}
.flowOn(dbDispatcher)

/**
* Storing the new state as a series/flow of Events
*
* `latestVersion` is used to provide you with the latest known version of the state/stream
*/
override fun Flow<Event?>.save(latestVersion: UUID?): Flow<Pair<Event, UUID>> = flow {
override fun Flow<Event?>.save(latestVersion: UUID?): Flow<Pair<Event, UUID>> =
saveWithMetaData(latestVersion, emptyMap()).map { Pair(it.first, it.second) }

/**
* Storing the new state as a series/flow of Events with metadata
* This method is implemented by Default in the EventRepository interface, but we need/want to override it here.
*
*
* `latestVersion` is used to provide you with the latest known version of the state/stream
* `metaData` is used to provide you with the metadata
*/
override fun Flow<Event?>.saveWithMetaData(
latestVersion: UUID?,
metaData: Map<String, Any>
): Flow<Triple<Event, UUID, Map<String, Any>>> = flow {
var previousId: UUID? = latestVersion
emitAll(
filterNotNull()
.map {
val eventId = UUID.randomUUID()
val eventEntity = it.toEventEntity(eventId, previousId)
val eventEntity = it.toEventEntity(eventId, previousId, metaData["commandId"] as UUID)
previousId = eventId
eventEntity
}
.appendAll()
.map { it.toEventWithId() }
.map { it.toEventWithIdAndMetaData() }
.map { Triple(it.first, it.second, mapOf("commandId" to it.third as Any)) }
)
}
.onCompletion {
when (it) {
null -> logger.debug { "saving new events completed successfully" }
else -> logger.warn { "saving new events completed with exception $it" }
}
}
.flowOn(dbDispatcher)
}

Expand All @@ -175,7 +195,7 @@ internal data class EventEntity(
val event: String,
val data: ByteArray,
val eventId: UUID,
val commandId: UUID?,
val commandId: UUID,
val previousId: UUID?,
val final: Boolean,
val createdAt: OffsetDateTime? = null,
Expand All @@ -198,7 +218,10 @@ internal val eventMapper: (Row, RowMetadata) -> EventEntity = { row, _ ->
}

internal fun EventEntity.toEventWithId() = Pair<Event, UUID>(Json.decodeFromString(data.decodeToString()), eventId)
internal fun Event.toEventEntity(eventId: UUID, previousId: UUID?, commandId: UUID? = null) = EventEntity(
internal fun EventEntity.toEventWithIdAndMetaData() =
Triple<Event, UUID, UUID?>(Json.decodeFromString(data.decodeToString()), eventId, commandId)

internal fun Event.toEventEntity(eventId: UUID, previousId: UUID?, commandId: UUID) = EventEntity(
decider(),
deciderId(),
event(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class AggregateRestCommandController(private val aggregate: Aggregate) {

@OptIn(ExperimentalCoroutinesApi::class)
private fun handle(command: Command): Flow<Event?> =
aggregate.handleOptimistically(command).map { it.first }
aggregate.handleOptimistically(command, mapOf("commandId" to UUID.randomUUID())).map { it.first }


@PostMapping("restaurants")
fun createRestaurant(@RequestBody command: CreateRestaurantCommand): Flow<Event?> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package com.fraktalio.example.fmodelspringdemo.adapter.web.rsocket
import com.fraktalio.example.fmodelspringdemo.application.Aggregate
import com.fraktalio.example.fmodelspringdemo.domain.Command
import com.fraktalio.example.fmodelspringdemo.domain.Event
import com.fraktalio.fmodel.application.handleOptimistically
import com.fraktalio.fmodel.application.handleOptimisticallyWithMetaData
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Controller
import java.util.*

@Controller
class AggregateRsocketCommandController(private val aggregate: Aggregate) {
@OptIn(ExperimentalCoroutinesApi::class)
@MessageMapping("commands")
fun handleCommand(@Payload commands: Flow<Command>): Flow<Event?> =
aggregate.handleOptimistically(commands).map { it.first }
aggregate.handleOptimisticallyWithMetaData(commands.map { Pair(it, mapOf("commandId" to UUID.randomUUID())) })
.map { it.first }
}
2 changes: 1 addition & 1 deletion src/main/resources/sql/event_sourcing.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS events
-- event data in JSON format
"data" JSONB NOT NULL,
-- command ID causing this event
"command_id" UUID NULL,
"command_id" UUID NOT NULL,
-- previous event uuid; null for first event; null does not trigger UNIQUE constraint; we defined a function `check_first_event_for_decider`
"previous_id" UUID UNIQUE,
-- indicator if the event stream for the `decider_id` is final
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.fraktalio.example.fmodelspringdemo.application

import com.fraktalio.example.fmodelspringdemo.domain.*
import com.fraktalio.fmodel.application.publishOptimisticallyTo
import com.fraktalio.fmodel.application.publishOptimisticallyWithMetaDataTo
import kotlinx.collections.immutable.toImmutableList
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
Expand All @@ -16,6 +16,7 @@ import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.TestConstructor
import org.springframework.test.context.TestConstructor.AutowireMode.ALL
import java.math.BigDecimal
import java.util.*


@SpringBootTest
Expand Down Expand Up @@ -47,7 +48,8 @@ class AggregateTest(private val aggregate: Aggregate) {

val events =
flowOf(createRestaurantCommand, changeRestaurantMenuCommand, placeOrderCommand, markOrderAsPreparedCommand)
.publishOptimisticallyTo(aggregate)
.map { Pair(it, mapOf("commandId" to UUID.randomUUID())) }
.publishOptimisticallyWithMetaDataTo(aggregate)
.map { it.first }
.toList()

Expand Down

0 comments on commit 16c8798

Please sign in to comment.