Skip to content

Commit

Permalink
Improve exception handling in batch operations
Browse files Browse the repository at this point in the history
  • Loading branch information
hermannm committed Nov 4, 2024
1 parent 7877bd0 commit 1182bbf
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 131 deletions.
158 changes: 83 additions & 75 deletions src/main/kotlin/no/liflig/documentstore/repository/RepositoryJdbi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import no.liflig.documentstore.entity.EntityId
import no.liflig.documentstore.entity.Version
import no.liflig.documentstore.entity.Versioned
import no.liflig.documentstore.entity.getEntityIdType
import no.liflig.documentstore.utils.BatchOperationException
import no.liflig.documentstore.utils.currentTimeWithMicrosecondPrecision
import no.liflig.documentstore.utils.executeBatchOperation
import no.liflig.documentstore.utils.isEmpty
Expand Down Expand Up @@ -206,31 +207,37 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
val createdAt = currentTimeWithMicrosecondPrecision()
val version = Version.initial()

transactional {
useHandle(jdbi) { handle ->
executeBatchOperation(
handle,
entities,
statement =
"""
INSERT INTO "${tableName}" (id, data, version, created_at, modified_at)
VALUES (:id, :data::jsonb, :version, :createdAt, :modifiedAt)
"""
.trimIndent(),
bindParameters = { batch, entity ->
batch
.bind("id", entity.id)
.bind("data", serializationAdapter.toJson(entity))
.bind("version", version)
.bind("createdAt", createdAt)
.bind("modifiedAt", createdAt)
},
)
try {
transactional {
useHandle(jdbi) { handle ->
executeBatchOperation(
handle,
entities,
statement =
"""
INSERT INTO "${tableName}" (id, data, version, created_at, modified_at)
VALUES (:id, :data::jsonb, :version, :createdAt, :modifiedAt)
"""
.trimIndent(),
bindParameters = { batch, entity ->
batch
.bind("id", entity.id)
.bind("data", serializationAdapter.toJson(entity))
.bind("version", version)
.bind("createdAt", createdAt)
.bind("modifiedAt", createdAt)
},
)
}
}
} catch (e: BatchOperationException) {
@Suppress("UNCHECKED_CAST") // We know the entity is EntityT on this repository
throw mapCreateOrUpdateException(e.cause, e.entity as EntityT)
}

// We wait until here to create the result list, which may be large, to avoid allocating it
// before calling the database. That would keep the list in memory while we are waiting for the
// before calling the database. That would keep the list in memory while we are waiting for
// the
// database, needlessly reducing throughput.
return entities.map { entity ->
Versioned(entity, version, createdAt = createdAt, modifiedAt = createdAt)
Expand All @@ -244,43 +251,48 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(

val modifiedAt = currentTimeWithMicrosecondPrecision()

transactional {
useHandle(jdbi) { handle ->
executeBatchOperation(
handle,
entities,
statement =
"""
UPDATE "${tableName}"
SET
data = :data::jsonb,
version = :nextVersion,
modified_at = :modifiedAt
WHERE
id = :id AND
version = :previousVersion
"""
.trimIndent(),
bindParameters = { batch, entity ->
val nextVersion = entity.version.next()

batch
.bind("data", serializationAdapter.toJson(entity.item))
.bind("nextVersion", nextVersion)
.bind("modifiedAt", modifiedAt)
.bind("id", entity.item.id)
.bind("previousVersion", entity.version)
},
handleModifiedRowCounts = { counts, batchStartIndex ->
handleModifiedRowCounts(counts, batchStartIndex, entities, operation = "update")
},
)
try {
transactional {
useHandle(jdbi) { handle ->
executeBatchOperation(
handle,
entities,
statement =
"""
UPDATE "${tableName}"
SET
data = :data::jsonb,
version = :nextVersion,
modified_at = :modifiedAt
WHERE
id = :id AND
version = :previousVersion
"""
.trimIndent(),
bindParameters = { batch, entity ->
val nextVersion = entity.version.next()

batch
.bind("data", serializationAdapter.toJson(entity.item))
.bind("nextVersion", nextVersion)
.bind("modifiedAt", modifiedAt)
.bind("id", entity.item.id)
.bind("previousVersion", entity.version)
},
handleModifiedRowCounts = { counts, batch ->
handleModifiedRowCounts(counts, batch, operation = "update")
},
)
}
}
} catch (e: BatchOperationException) {
@Suppress("UNCHECKED_CAST") // We know the entity is EntityT on this repository
throw mapCreateOrUpdateException(e.cause, e.entity as EntityT)
}

// We wait until here to create the result list, which may be large, to avoid allocating it
// before calling the database. That would keep the list in memory while we are waiting for the
// database, needlessly reducing throughput.
// before calling the database. That would keep the list in memory while we are waiting for
// the database, needlessly reducing throughput.
return entities.map { entity ->
entity.copy(modifiedAt = modifiedAt, version = entity.version.next())
}
Expand All @@ -307,8 +319,8 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
bindParameters = { batch, entity ->
batch.bind("id", entity.item.id).bind("previousVersion", entity.version)
},
handleModifiedRowCounts = { counts, batchStartIndex ->
handleModifiedRowCounts(counts, batchStartIndex, entities, operation = "delete")
handleModifiedRowCounts = { counts, batch ->
handleModifiedRowCounts(counts, batch, operation = "delete")
},
)
}
Expand Down Expand Up @@ -482,18 +494,21 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
}

/**
* Method that you can override to map exceptions thrown in [create] or [update] to your own
* exception type. This is useful to handle e.g. unique constraint violations: instead of letting
* the database throw an opaque `PSQLException` that may be difficult to handle in layers above,
* you can instead check if the given exception is a unique index violation and map it to a more
* useful exception type here.
* Method that you can override to map exceptions thrown in [create] / [update] / [batchCreate] /
* [batchUpdate] to your own exception type. This is useful to handle e.g. unique constraint
* violations: instead of letting the database throw an opaque `PSQLException` that may be
* difficult to handle in layers above, you can instead check if the given exception is a unique
* index violation and map it to a more useful exception type here.
*
* If your implementation receives an exception here that it does not want to map, it should just
* return it as-is.
*
* The entity that was attempted to be created or updated is also provided here, so you can add
* extra context to the mapped exception.
*
* This method is only called by [batchCreate] / [batchUpdate] if the batch operation failed
* because of a single entity (e.g. a unique constraint violation).
*
* Example:
* ```
* override fun mapCreateOrUpdateException(e: Exception, entity: ExampleEntity): Exception {
Expand All @@ -519,24 +534,17 @@ open class RepositoryJdbi<EntityIdT : EntityId, EntityT : Entity<EntityIdT>>(
*/
private fun handleModifiedRowCounts(
modifiedRowCounts: IntArray,
batchStartIndex: Int,
entities: Iterable<Versioned<EntityT>>,
batch: List<Versioned<EntityT>>,
operation: String,
) {
for (count in modifiedRowCounts.withIndex()) {
if (count.value == 0) {
var exceptionMessage =
"Entity was concurrently modified between being retrieved and trying to ${operation} it in batch ${operation} (rolling back batch ${operation})"
// We want to add the entity to the exception message for context, but we can only do this
// if the Iterable is indexable
if (entities is List) {
val entity = entities.getOrNull(batchStartIndex + count.index)
if (entity != null) {
exceptionMessage += " [Entity: ${entity}]"
}
}

throw ConflictRepositoryException(exceptionMessage)
// Should never be null, but we don't want to suppress the ConflictRepositoryException here
// if it is
val conflictedEntity: EntityT? = batch.getOrNull(count.index)?.item
throw ConflictRepositoryException(
"Entity was concurrently modified between being retrieved and trying to ${operation} it in batch ${operation} (rolling back batch ${operation}) [Entity: ${conflictedEntity}]",
)
}
}
}
Expand Down
Loading

0 comments on commit 1182bbf

Please sign in to comment.