From a531b45574e6453964d90114b32fa5dc30575694 Mon Sep 17 00:00:00 2001 From: Pawel Mazurek <52866094+cyberhead-pl@users.noreply.github.com> Date: Tue, 17 Sep 2024 08:34:29 +0200 Subject: [PATCH] upsert + bulk --- .../commonMain/kotlin/naksha/psql/PgColumn.kt | 3 + .../kotlin/naksha/psql/executors/PgWriter.kt | 31 ++-- .../psql/executors/write/BulkWriteExecutor.kt | 134 +++++++++++++- .../psql/executors/write/DeleteFeature.kt | 25 ++- .../write/ExisingMetadataProvider.kt | 93 ++++++++++ .../psql/executors/write/InsertFeature.kt | 2 +- .../executors/write/InstantWriteExecutor.kt | 102 ++++++++++- .../psql/executors/write/UpdateFeature.kt | 173 ++---------------- .../psql/executors/write/WriteExecutor.kt | 15 +- .../kotlin/naksha/psql/UpdateFeatureTest.kt | 6 +- .../kotlin/naksha/psql/UpsertFeatureTest.kt | 75 ++++++++ 11 files changed, 451 insertions(+), 208 deletions(-) create mode 100644 here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/ExisingMetadataProvider.kt create mode 100644 here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpsertFeatureTest.kt diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/PgColumn.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/PgColumn.kt index 84fa1267a..023fb4ece 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/PgColumn.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/PgColumn.kt @@ -3,6 +3,7 @@ package naksha.psql import naksha.base.JsEnum +import naksha.model.quoteId import naksha.model.request.query.TupleColumn import kotlin.js.JsExport import kotlin.js.JsStatic @@ -76,6 +77,8 @@ class PgColumn : JsEnum() { } private set + fun quoted() = quoteId(this.name) + companion object PgColumnCompanion { /** * Returns the columns instance for the given name, diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/PgWriter.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/PgWriter.kt index cff25667e..8419f55dd 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/PgWriter.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/PgWriter.kt @@ -15,10 +15,7 @@ import naksha.model.objects.NakshaCollection import naksha.model.objects.NakshaFeature import naksha.model.request.* import naksha.psql.* -import naksha.psql.executors.write.DeleteFeature -import naksha.psql.executors.write.InsertFeature -import naksha.psql.executors.write.UpdateFeature -import naksha.psql.executors.write.WriteExecutor +import naksha.psql.executors.write.* import kotlin.jvm.JvmField // TODO: We need to fix NakshaBulkLoaderPlan to make this faster again ! @@ -185,6 +182,8 @@ class PgWriter( val tuples = this.tuples val tupleNumbers = this.tupleNumbers + val previousMetadataProvider = ExisingMetadataProvider(session, orderedWrites) + // First, process collections, no performance need here for now. for (write in orderedWrites) { if (write == null) continue @@ -201,12 +200,18 @@ class PgWriter( ) } } else { + val collection = collectionOf(write) when (write.op) { - WriteOp.CREATE -> InsertFeature(this, writeExecutor).execute(collectionOf(write), write) - WriteOp.UPSERT -> returnTuple(write, upsertFeature(collectionOf(write), write)) - WriteOp.UPDATE -> UpdateFeature(this).execute(collectionOf(write), write) - WriteOp.DELETE -> DeleteFeature(this).execute(collectionOf(write), write) - WriteOp.PURGE -> returnTuple(write, purgeFeature(collectionOf(write), write)) + WriteOp.CREATE -> InsertFeature(this, writeExecutor).execute(collection, write) + WriteOp.UPSERT -> + if (write.id == null || previousMetadataProvider.get(collection.head.name, write.id!!) == null) { + InsertFeature(this, writeExecutor).execute(collection, write) + } else { + UpdateFeature(this, previousMetadataProvider, writeExecutor).execute(collection, write) + } + WriteOp.UPDATE -> UpdateFeature(this, previousMetadataProvider, writeExecutor).execute(collection, write) + WriteOp.DELETE -> DeleteFeature(this, previousMetadataProvider, writeExecutor).execute(collection, write) + WriteOp.PURGE -> TODO() else -> throw NakshaException( UNSUPPORTED_OPERATION, "Unknown write-operation: '${write.op}'" @@ -393,12 +398,4 @@ class PgWriter( ).close() return tuple } - - internal fun upsertFeature(collection: PgCollection, write: WriteExt): Tuple { - TODO("Implement me") - } - - internal fun purgeFeature(collection: PgCollection, write: WriteExt): Tuple { - TODO("Implement me") - } } \ No newline at end of file diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/BulkWriteExecutor.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/BulkWriteExecutor.kt index aa3c7fbec..025f1e446 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/BulkWriteExecutor.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/BulkWriteExecutor.kt @@ -1,7 +1,7 @@ package naksha.psql.executors.write +import naksha.model.* import naksha.model.Naksha.NakshaCompanion.quoteIdent -import naksha.model.Tuple import naksha.model.objects.NakshaFeature import naksha.psql.PgCollection import naksha.psql.PgColumn @@ -11,17 +11,21 @@ import naksha.psql.executors.write.WriteFeatureUtils.allColumnValues class BulkWriteExecutor( val session: PgSession, -): WriteExecutor { +) : WriteExecutor { private var deleteFromDel: MutableMap = mutableMapOf() private var insertToHead: MutableMap = mutableMapOf() + private var updateHead: MutableMap = mutableMapOf() + private var copyHeadToHst: MutableMap = mutableMapOf() + private var copyHeadToDel: MutableMap = mutableMapOf() override fun removeFeatureFromDel(collection: PgCollection, featureId: String) { if (!deleteFromDel.containsKey(collection)) { collection.deleted?.let { delTable -> val quotedDelTable = quoteIdent(delTable.name) val quotedIdColumn = quoteIdent(PgColumn.id.name) - val plan = session.usePgConnection().prepare("DELETE FROM $quotedDelTable WHERE $quotedIdColumn=$1", arrayOf(PgColumn.id.type.text)) + val plan = session.usePgConnection() + .prepare("DELETE FROM $quotedDelTable WHERE $quotedIdColumn=$1", arrayOf(PgColumn.id.type.text)) deleteFromDel[collection] = plan } } @@ -46,14 +50,130 @@ class BulkWriteExecutor( insertToHead[collection]!!.addBatch(allColumnValues(tuple = tuple, feature = feature, txn = session.transaction().txn)) } + + // hst table + // txn_next txn uid flags + // CREATED/UPDATED new (1) old unchanged from head unchanged from head + // DELETED new (1) new (1) new with deleted action bits + // (1) denotes the same value, taken from current txn / version of current PgSession + /** + * Persist the feature entry in HEAD table into the destination table (HST or DEL). + * @param tupleNumber if intend to insert a tombstone DELETED state, provide this tuple number + * of the tombstone state, with new uid and current session txn + * @param flags the new flags. If intend to insert a tombstone DELETED state, + * provide the old flags but with action DELETED. + */ + override fun copyHeadToHst( + collection: PgCollection, + tupleNumber: TupleNumber?, + flags: Flags?, + featureId: String + ) { + if (!copyHeadToHst.containsKey(collection)) { + copyHeadToHst[collection] = createCopyPlan(collection.head.quotedName, collection.history!!.quotedName) + } + copyHeadToHst[collection]!!.addBatch( + args = arrayOf( + session.transaction().txn, + tupleNumber?.version?.txn, + tupleNumber?.uid, + flags, + featureId + ) + ) + } + + override fun copyHeadToDel(collection: PgCollection, tupleNumber: TupleNumber?, flags: Flags?, featureId: String) { + if (!copyHeadToDel.containsKey(collection)) { + copyHeadToDel[collection] = createCopyPlan(collection.head.quotedName, collection.deleted!!.quotedName) + } + copyHeadToDel[collection]!!.addBatch( + args = arrayOf( + session.transaction().txn, + tupleNumber?.version?.txn, + tupleNumber?.uid, + flags, + featureId + ) + ) + } + + override fun updateFeatureInHead( + collection: PgCollection, + tuple: Tuple, + feature: NakshaFeature, + newVersion: Version, + previousMetadata: Metadata + ) { + if (!updateHead.containsKey(collection)) { + val columnEqualsVariable = PgColumn.allWritableColumns.mapIndexed { index, pgColumn -> + "${pgColumn.name}=\$${index + 1}" + }.joinToString(separator = ",") + val quotedHeadTable = collection.head.quotedName + + val conn = session.usePgConnection() + val plan = conn.prepare( + sql = """ UPDATE $quotedHeadTable + SET $columnEqualsVariable + WHERE ${PgColumn.id.quoted()}=$${PgColumn.allWritableColumns.size + 1} + """.trimIndent(), + PgColumn.allWritableColumns.map { it.type.text }.toTypedArray() + ) + updateHead[collection] = plan + } + updateHead[collection]!!.addBatch( + args = allColumnValues( + tuple = tuple, + feature = feature, + txn = newVersion.txn, + prevTxn = previousMetadata.version.txn, + prevUid = previousMetadata.uid, + changeCount = previousMetadata.changeCount + 1 + ).plus(feature.id) + ) + } + override fun finish() { deleteFromDel.forEach { - it.value.executeBatch() - it.value.close() + it.value.use { stmt -> stmt.executeBatch() } + } + copyHeadToHst.forEach { + it.value.use { stmt -> stmt.executeBatch() } + } + copyHeadToDel.forEach { + it.value.use { stmt -> stmt.executeBatch() } + } + updateHead.forEach { + it.value.use { stmt -> stmt.executeBatch() } } insertToHead.forEach { - it.value.executeBatch() - it.value.close() + it.value.use { stmt -> stmt.executeBatch() } } + + } + + private fun createCopyPlan(headTableName: String, dstTableName: String): PgPlan { + + val columnsToOverride = mutableListOf(PgColumn.txn_next, PgColumn.txn, PgColumn.uid, PgColumn.flags) + val columnsToCopy = PgColumn.allWritableColumns.minus(columnsToOverride.toSet()) + val columns = mutableListOf() + columns.addAll(columnsToOverride) + columns.addAll(columnsToCopy) + + val columnNames = columns.joinToString(separator = ",") + val copyColumnNames = columnsToCopy.joinToString(separator = ",") + + return session.usePgConnection().prepare( + sql = """ + INSERT INTO $dstTableName($columnNames) + SELECT $1, + COALESCE($2, ${PgColumn.txn}), + COALESCE($3, ${PgColumn.uid}), + COALESCE($4, ${PgColumn.flags}), + $copyColumnNames FROM $headTableName + WHERE ${PgColumn.id.quoted()} = $5 + """.trimIndent(), + columns.map { it.type.text }.toTypedArray() + ) } } \ No newline at end of file diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/DeleteFeature.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/DeleteFeature.kt index 807e0e9d4..445989312 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/DeleteFeature.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/DeleteFeature.kt @@ -4,6 +4,7 @@ import naksha.model.* import naksha.model.request.ReadFeatures import naksha.model.request.SuccessResponse import naksha.psql.PgCollection +import naksha.psql.PgColumn import naksha.psql.PgUtil.PgUtilCompanion.quoteIdent import naksha.psql.executors.PgReader import naksha.psql.executors.PgWriter @@ -12,8 +13,10 @@ import naksha.psql.executors.write.WriteFeatureUtils.newFeatureTupleNumber import naksha.psql.executors.write.WriteFeatureUtils.resolveFlags class DeleteFeature( - writer: PgWriter -) : UpdateFeature(writer) { + writer: PgWriter, + exisingMetadataProvider: ExisingMetadataProvider, + writeExecutor: WriteExecutor +) : UpdateFeature(writer, exisingMetadataProvider, writeExecutor) { override fun execute(collection: PgCollection, write: WriteExt): TupleNumber { val featureId = write.featureId ?: throw NakshaException(NakshaError.ILLEGAL_ARGUMENT, "No feature ID provided") @@ -29,15 +32,10 @@ class DeleteFeature( // If hst table enabled collection.history?.let { hstTable -> // copy head state into hst with txn_next === txn - insertHeadToTable( - destinationTable = hstTable, - headTable = collection.head, - featureId = featureId - ) + writeExecutor.copyHeadToHst(collection = collection, featureId = featureId) // also copy head state into hst with txn_next === txn and action DELETED as a tombstone state - insertHeadToTable( - destinationTable = hstTable, - headTable = collection.head, + writeExecutor.copyHeadToHst( + collection = collection, tupleNumber = tupleNumber, flags = flags, featureId = featureId, @@ -46,9 +44,8 @@ class DeleteFeature( // If del table enabled, copy head state into del, with action DELETED and txn_next === txn as a tombstone state collection.deleted?.let { delTable -> - insertHeadToTable( - destinationTable = delTable, - headTable = collection.head, + writeExecutor.copyHeadToDel( + collection = collection, tupleNumber = tupleNumber, flags = flags, featureId = featureId, @@ -66,7 +63,7 @@ class DeleteFeature( val quotedHeadTable = quoteIdent(headTable.name) session.usePgConnection() .execute( - sql = "DELETE FROM $quotedHeadTable WHERE $quotedIdColumn=$1", + sql = "DELETE FROM $quotedHeadTable WHERE ${PgColumn.id.quoted()}=$1", args = arrayOf(featureId) ).close() } diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/ExisingMetadataProvider.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/ExisingMetadataProvider.kt new file mode 100644 index 000000000..ff9a8b88b --- /dev/null +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/ExisingMetadataProvider.kt @@ -0,0 +1,93 @@ +package naksha.psql.executors.write + +import naksha.base.Int64 +import naksha.base.ListProxy +import naksha.model.Metadata +import naksha.model.TupleNumber +import naksha.model.Version +import naksha.model.request.WriteOp.WriteOp_C.DELETE +import naksha.model.request.WriteOp.WriteOp_C.UPDATE +import naksha.model.request.WriteOp.WriteOp_C.UPSERT +import naksha.psql.PgCollection +import naksha.psql.PgColumn +import naksha.psql.PgSession +import naksha.psql.PgUtil.PgUtilCompanion.quoteIdent +import naksha.psql.executors.WriteExt +import naksha.psql.executors.write.PgCursorUtil.collectAndClose + +class ExisingMetadataProvider( + private val session: PgSession, + writes: ListProxy +) { + // cache for metadata values + private val cache: MutableMap> = mutableMapOf() + + init { + val featuresPerCollection = writes + .filterNotNull() + .filter { it.op == UPDATE || it.op == UPSERT || it.op == DELETE } + .filter { it.id != null } + .groupBy { it.collectionId } + for (entry in featuresPerCollection.entries) { + val idsToFetch = entry.value.map { it.id!! }.toSet() + val results = fetchCurrentMeta(entry.key, idsToFetch) + + val metaMap = mutableMapOf() + for (result in results) { + metaMap[result.id] = result + } + cache[entry.key] = metaMap + } + } + + fun get(collectionHeadName: String, featureId: String): Metadata? { + return cache[collectionHeadName]?.get(featureId) + } + + private fun fetchCurrentMeta(collectionHeadName: String, featureIds: Set): List { + val quotedHeadName = quoteIdent(collectionHeadName) + val sql = """SELECT ${PgColumn.metaSelect} + FROM $quotedHeadName + WHERE ${PgColumn.id.quoted()} = ANY($1) + """.trimMargin() + val fetchedMetadata = session.usePgConnection() + .execute(sql, arrayOf(featureIds.toTypedArray())) + .collectAndClose { metaFromRow(it) } + return fetchedMetadata + } + + private fun metaFromRow(row: PgCursorUtil.ReadOnlyRow): Metadata { + val tupleNumber: TupleNumber = TupleNumber.fromByteArray(row[PgColumn.tuple_number]) + val updatedAt: Int64 = row.column(PgColumn.updated_at) as? Int64 ?: Int64(0) + return Metadata( + storeNumber = tupleNumber.storeNumber, + updatedAt = updatedAt, + createdAt = row.column(PgColumn.created_at) as? Int64 ?: updatedAt, + authorTs = row[PgColumn.author_ts], + nextVersion = maybeVersion(row.column(PgColumn.txn_next)), + version = tupleNumber.version, + prevVersion = maybeVersion(row.column(PgColumn.ptxn)), + uid = tupleNumber.uid, + puid = row.column(PgColumn.puid) as? Int, + hash = row[PgColumn.hash], + changeCount = row[PgColumn.change_count], + geoGrid = row[PgColumn.geo_grid], + flags = row[PgColumn.flags], + id = row[PgColumn.id], + appId = row[PgColumn.app_id], + author = row.column(PgColumn.author) as? String, + type = row.column(PgColumn.type) as? String, + origin = row.column(PgColumn.origin) as? String + ) + } + + private fun maybeVersion(rawCursorValue: Any?): Version? { + return (rawCursorValue as? Int64)?.let { + if (it.toInt() == 0) { + null + } else { + Version(it) + } + } + } +} \ No newline at end of file diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InsertFeature.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InsertFeature.kt index c5e1ab800..e92ab061e 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InsertFeature.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InsertFeature.kt @@ -18,7 +18,7 @@ import kotlin.jvm.JvmField class InsertFeature( @JvmField val writer: PgWriter, - val writeExecutor: WriteExecutor + private val writeExecutor: WriteExecutor ) { val session = writer.session diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InstantWriteExecutor.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InstantWriteExecutor.kt index 997538d0c..51afdc058 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InstantWriteExecutor.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/InstantWriteExecutor.kt @@ -1,16 +1,15 @@ package naksha.psql.executors.write +import naksha.model.* import naksha.model.Naksha.NakshaCompanion.quoteIdent -import naksha.model.Tuple import naksha.model.objects.NakshaFeature -import naksha.psql.PgCollection -import naksha.psql.PgColumn -import naksha.psql.PgSession +import naksha.psql.* +import naksha.psql.PgUtil.PgUtilCompanion import naksha.psql.executors.write.WriteFeatureUtils.allColumnValues class InstantWriteExecutor( val session: PgSession, -): WriteExecutor { +) : WriteExecutor { override fun removeFeatureFromDel(collection: PgCollection, featureId: String) { collection.deleted?.let { delTable -> @@ -40,7 +39,100 @@ class InstantWriteExecutor( ).close() } + override fun copyHeadToDel(collection: PgCollection, tupleNumber: TupleNumber?, flags: Flags?, featureId: String) { + copyHeadTo(collection.deleted!!, collection.head, tupleNumber, flags, featureId) + } + + override fun copyHeadToHst(collection: PgCollection, tupleNumber: TupleNumber?, flags: Flags?, featureId: String) { + copyHeadTo(collection.history!!, collection.head, tupleNumber, flags, featureId) + } + + override fun updateFeatureInHead( + collection: PgCollection, + tuple: Tuple, + feature: NakshaFeature, + newVersion: Version, + previousMetadata: Metadata + ) { + val conn = session.usePgConnection() + conn.execute( + sql = updateStatement(collection.head.name), + args = allColumnValues( + tuple = tuple, + feature = feature, + txn = newVersion.txn, + prevTxn = previousMetadata.version.txn, + prevUid = previousMetadata.uid, + changeCount = previousMetadata.changeCount + 1 + ).plus(feature.id) + ).close() + } + + private fun updateStatement(headTableName: String): String { + val columnEqualsVariable = PgColumn.allWritableColumns.mapIndexed { index, pgColumn -> + "${pgColumn.name}=\$${index + 1}" + }.joinToString(separator = ",") + val quotedHeadTable = PgUtil.quoteIdent(headTableName) + return """ UPDATE $quotedHeadTable + SET $columnEqualsVariable + WHERE ${PgColumn.id.quoted()}=$${PgColumn.allWritableColumns.size+1} + """.trimIndent() + } + override fun finish() { // nothing to do } + + // hst table + // txn_next txn uid flags + // CREATED/UPDATED new (1) old unchanged from head unchanged from head + // DELETED new (1) new (1) new with deleted action bits + // (1) denotes the same value, taken from current txn / version of current PgSession + /** + * Persist the feature entry in HEAD table into the destination table (HST or DEL). + * @param tupleNumber if intend to insert a tombstone DELETED state, provide this tuple number + * of the tombstone state, with new uid and current session txn + * @param flags the new flags. If intend to insert a tombstone DELETED state, + * provide the old flags but with action DELETED. + */ + private fun copyHeadTo( + destinationTable: PgTable, + headTable: PgTable, + tupleNumber: TupleNumber?, + flags: Flags?, + featureId: String + ) { + val dstTableName = PgUtil.quoteIdent(destinationTable.name) + val headTableName = PgUtil.quoteIdent(headTable.name) + val otherColumns = PgColumn.allWritableColumns + .asSequence() + .filterNot { it == PgColumn.txn_next } + .filterNot { it == PgColumn.txn } + .filterNot { it == PgColumn.uid } + .filterNot { it == PgColumn.flags } + .joinToString(separator = ",") + session.usePgConnection().execute( + sql = """ + INSERT INTO $dstTableName( + ${PgColumn.txn_next.name}, + ${PgColumn.txn.name}, + ${PgColumn.uid.name}, + ${PgColumn.flags.name}, + $otherColumns) + SELECT $1, + COALESCE($2, ${PgColumn.txn}), + COALESCE($3, ${PgColumn.uid}), + COALESCE($4, ${PgColumn.flags}), + $otherColumns FROM $headTableName + WHERE ${PgColumn.id.quoted()} = $5 + """.trimIndent(), + args = arrayOf( + session.transaction().txn, + tupleNumber?.version?.txn, + tupleNumber?.uid, + flags, + featureId + ) + ).close() + } } \ No newline at end of file diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/UpdateFeature.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/UpdateFeature.kt index c3bf96dc4..d6275d5b5 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/UpdateFeature.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/UpdateFeature.kt @@ -1,6 +1,5 @@ package naksha.psql.executors.write -import naksha.base.Int64 import naksha.model.* import naksha.model.Metadata.Metadata_C.geoGrid import naksha.model.Metadata.Metadata_C.hash @@ -11,15 +10,15 @@ import naksha.psql.PgTable import naksha.psql.PgUtil.PgUtilCompanion.quoteIdent import naksha.psql.executors.PgWriter import naksha.psql.executors.WriteExt -import naksha.psql.executors.write.PgCursorUtil.collectAndClose import naksha.psql.executors.write.WriteFeatureUtils.newFeatureTupleNumber import naksha.psql.executors.write.WriteFeatureUtils.resolveFlags import naksha.psql.executors.write.WriteFeatureUtils.tuple import kotlin.jvm.JvmField -import kotlin.jvm.JvmStatic open class UpdateFeature( - @JvmField val writer: PgWriter + @JvmField val writer: PgWriter, + private val exisingMetadataProvider: ExisingMetadataProvider, + protected val writeExecutor: WriteExecutor ) { val session = writer.session @@ -34,8 +33,10 @@ open class UpdateFeature( require(feature.id == write.featureId) { "Feature id in payload (${feature.id}) and write request (${write.featureId}) are different" } - - val previousMetadata = fetchCurrentMeta(collection, feature.id) + val previousMetadata = exisingMetadataProvider.get(collection.head.name, write.id!!) + require(previousMetadata != null) { + "Trying update feature that not exists in head: ${write.id}" + } require(feature.id == previousMetadata.id) { "Feature id (${feature.id}) differs from previous metadata (${previousMetadata.id})" } @@ -55,132 +56,18 @@ open class UpdateFeature( flags ) - removeFeatureFromDel(collection, feature.id) + writeExecutor.removeFeatureFromDel(collection, feature.id) collection.history?.let { hstTable -> - insertHeadToTable( - destinationTable = hstTable, - headTable = collection.head, + writeExecutor.copyHeadToHst( + collection = collection, featureId = feature.id ) } - updateFeatureInHead(collection, tuple, feature, newVersion, previousMetadata) + writeExecutor.updateFeatureInHead(collection, tuple, feature, newVersion, previousMetadata) return writer.returnTuple(write, tuple) } - protected fun fetchCurrentMeta(collection: PgCollection, featureId: String): Metadata { - val quotedHeadName = quoteIdent(collection.head.name) - val sql = """SELECT ${PgColumn.metaSelect} - FROM $quotedHeadName - WHERE $quotedIdColumn=$1 - """.trimMargin() - val fetchedMetadata = session.usePgConnection() - .execute(sql, arrayOf(featureId)) - .collectAndClose { metaFromRow(it) } - require(fetchedMetadata.size == 1) { - "Expected single metadata, got ${fetchedMetadata.size} instead (query: $sql)" - } - return fetchedMetadata[0] - } - - private fun metaFromRow(row: PgCursorUtil.ReadOnlyRow): Metadata { - val tupleNumber: TupleNumber = TupleNumber.fromByteArray(row[PgColumn.tuple_number]) - val updatedAt: Int64 = row.column(PgColumn.updated_at) as? Int64 ?: Int64(0) - return Metadata( - storeNumber = tupleNumber.storeNumber, - updatedAt = updatedAt, - createdAt = row.column(PgColumn.created_at) as? Int64 ?: updatedAt, - authorTs = row[PgColumn.author_ts], - nextVersion = maybeVersion(row.column(PgColumn.txn_next)), - version = tupleNumber.version, - prevVersion = maybeVersion(row.column(PgColumn.ptxn)), - uid = tupleNumber.uid, - puid = row.column(PgColumn.puid) as? Int, - hash = row[PgColumn.hash], - changeCount = row[PgColumn.change_count], - geoGrid = row[PgColumn.geo_grid], - flags = row[PgColumn.flags], - id = row[PgColumn.id], - appId = row[PgColumn.app_id], - author = row.column(PgColumn.author) as? String, - type = row.column(PgColumn.type) as? String, - origin = row.column(PgColumn.origin) as? String - ) - } - - private fun maybeVersion(rawCursorValue: Any?): Version? { - return (rawCursorValue as? Int64)?.let { - if (it.toInt() == 0) { - null - } else { - Version(it) - } - } - } - - private fun removeFeatureFromDel(collection: PgCollection, featureId: String) { - collection.deleted?.let { delTable -> - val quotedDelTable = quoteIdent(delTable.name) - session.usePgConnection() - .execute( - sql = "DELETE FROM $quotedDelTable WHERE $quotedIdColumn=$1", - args = arrayOf(featureId) - ).close() - } - } - - // hst table - // txn_next txn uid flags - // CREATED/UPDATED new (1) old unchanged from head unchanged from head - // DELETED new (1) new (1) new with deleted action bits - // (1) denotes the same value, taken from current txn / version of current PgSession - /** - * Persist the feature entry in HEAD table into the destination table (HST or DEL). - * @param tupleNumber if intend to insert a tombstone DELETED state, provide this tuple number - * of the tombstone state, with new uid and current session txn - * @param flags the new flags. If intend to insert a tombstone DELETED state, - * provide the old flags but with action DELETED. - */ - protected fun insertHeadToTable( - destinationTable: PgTable, - headTable: PgTable, - tupleNumber: TupleNumber? = null, - flags: Flags? = null, - featureId: String - ) { - val desTableName = quoteIdent(destinationTable.name) - val headTableName = quoteIdent(headTable.name) - val otherColumns = PgColumn.allWritableColumns - .asSequence() - .filterNot { it == PgColumn.txn_next } - .filterNot { it == PgColumn.txn } - .filterNot { it == PgColumn.uid } - .filterNot { it == PgColumn.flags } - .joinToString(separator = ",") - session.usePgConnection().execute( - sql = """ - INSERT INTO $desTableName( - ${PgColumn.txn_next.name}, - ${PgColumn.txn.name}, - ${PgColumn.uid.name}, - ${PgColumn.flags.name}, - $otherColumns) - SELECT $1, - COALESCE($2, ${PgColumn.txn}), - COALESCE($3, ${PgColumn.uid}), - COALESCE($4, ${PgColumn.flags}), - $otherColumns FROM $headTableName - WHERE $quotedIdColumn = $5 - """.trimIndent(), - args = arrayOf( - session.transaction().txn, - tupleNumber?.version?.txn, - tupleNumber?.uid, - flags, - featureId) - ).close() - } - private fun metadataForNewVersion( previousMetadata: Metadata, newVersion: Version, @@ -198,48 +85,12 @@ open class UpdateFeature( hash = hash(feature, session.options.excludePaths, session.options.excludeFn), changeCount = previousMetadata.changeCount + 1, geoGrid = geoGrid(feature), - flags = flags, + flags = flags.action(Action.UPDATED), appId = session.options.appId, author = session.options.author ?: previousMetadata.author, id = feature.id ) } - private fun updateFeatureInHead( - collection: PgCollection, - tuple: Tuple, - feature: NakshaFeature, - newVersion: Version, - previousMetadata: Metadata - ): Tuple { - val conn = session.usePgConnection() - conn.execute( - sql = updateStatement(collection.head.name), - args = WriteFeatureUtils.allColumnValues( - tuple = tuple, - feature = feature, - txn = newVersion.txn, - prevTxn = previousMetadata.version.txn, - prevUid = previousMetadata.uid, - changeCount = previousMetadata.changeCount + 1 - ).plus(feature.id) - ).close() - return tuple - } - private fun updateStatement(headTableName: String): String { - val columnEqualsVariable = PgColumn.allWritableColumns.mapIndexed { index, pgColumn -> - "${pgColumn.name}=\$${index + 1}" - }.joinToString(separator = ",") - val quotedHeadTable = quoteIdent(headTableName) - return """ UPDATE $quotedHeadTable - SET $columnEqualsVariable - WHERE $quotedIdColumn=$${PgColumn.allWritableColumns.size+1} - """.trimIndent() - } - - companion object { - @JvmStatic - protected val quotedIdColumn: String = quoteIdent(PgColumn.id.name) - } } \ No newline at end of file diff --git a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/WriteExecutor.kt b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/WriteExecutor.kt index 9da7eae7e..987b8ec39 100644 --- a/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/WriteExecutor.kt +++ b/here-naksha-lib-psql/src/commonMain/kotlin/naksha/psql/executors/write/WriteExecutor.kt @@ -1,8 +1,9 @@ package naksha.psql.executors.write -import naksha.model.Tuple +import naksha.model.* import naksha.model.objects.NakshaFeature import naksha.psql.PgCollection +import naksha.psql.PgTable interface WriteExecutor { @@ -15,4 +16,16 @@ interface WriteExecutor { ) fun finish() + + fun copyHeadToHst(collection: PgCollection, tupleNumber: TupleNumber? = null, flags: Flags? = null, featureId: String) + + fun copyHeadToDel(collection: PgCollection, tupleNumber: TupleNumber? = null, flags: Flags? = null, featureId: String) + + fun updateFeatureInHead( + collection: PgCollection, + tuple: Tuple, + feature: NakshaFeature, + newVersion: Version, + previousMetadata: Metadata + ) } \ No newline at end of file diff --git a/here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpdateFeatureTest.kt b/here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpdateFeatureTest.kt index 9adb4980b..349d1c7ff 100644 --- a/here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpdateFeatureTest.kt +++ b/here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpdateFeatureTest.kt @@ -9,13 +9,14 @@ import naksha.model.request.Write import naksha.model.request.WriteRequest import naksha.psql.assertions.NakshaFeatureFluidAssertions.Companion.assertThatFeature import naksha.psql.base.PgTestBase +import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals class UpdateFeatureTest : PgTestBase(NakshaCollection("update_feature_test_c")) { @Test - fun shouldPerformSimpleUpdate() { + fun shouldPerformSimpleUpdateAndUpsert() { // Given: Initial state of feature val initialFeature = NakshaFeature().apply { id = "feature_1" @@ -67,7 +68,8 @@ class UpdateFeatureTest : PgTestBase(NakshaCollection("update_feature_test_c")) retrievedXyz .hasProperty("appId", PgTest.TEST_APP_ID) .hasProperty("author", PgTest.TEST_APP_AUTHOR!!) - .hasProperty("action", Action.CREATED) + .hasProperty("action", Action.UPDATED) + .hasProperty("changeCount", 2) } } } diff --git a/here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpsertFeatureTest.kt b/here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpsertFeatureTest.kt new file mode 100644 index 000000000..fb3c47a09 --- /dev/null +++ b/here-naksha-lib-psql/src/commonTest/kotlin/naksha/psql/UpsertFeatureTest.kt @@ -0,0 +1,75 @@ +package naksha.psql + +import naksha.model.Action +import naksha.model.objects.NakshaCollection +import naksha.model.objects.NakshaFeature +import naksha.model.objects.NakshaProperties +import naksha.model.request.ReadFeatures +import naksha.model.request.Write +import naksha.model.request.WriteRequest +import naksha.psql.assertions.NakshaFeatureFluidAssertions.Companion.assertThatFeature +import naksha.psql.base.PgTestBase +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals + +class UpsertFeatureTest : PgTestBase(NakshaCollection("upsert_feature_test_c")) { + + @Test + fun shouldPerformSimpleUpdateAndUpsert() { + // Given: Initial state of feature + val initialFeature = NakshaFeature().apply { + id = "feature_1" + } + val writeInitialFeature = WriteRequest().add( + Write().upsertFeature(null, collection!!.id, initialFeature) + ) + + val upsertFeaturesReq = WriteRequest().add( + Write().upsertFeature(null, collection.id, initialFeature) + ) + + // When: Writing initial version of feature + executeWrite(writeInitialFeature) + + executeWrite(upsertFeaturesReq) + + // And: Retrieving feature by id + val retrievedFeatures = executeRead(ReadFeatures().apply { + collectionIds += collection.id + featureIds += initialFeature.id + queryHistory = true + }).features.sortedBy { it!!.properties.xyz.version.toLong() } + + // Then + assertThatFeature(retrievedFeatures[0]!!) + .isIdenticalTo( + other = initialFeature, + ignoreProps = true // we ignore properties because we want to examine them later + ) + .hasPropertiesThat { retrievedProperties -> + retrievedProperties + .hasFeatureType(initialFeature.properties.featureType) + .hasXyzThat { retrievedXyz -> + retrievedXyz + .hasProperty("action", Action.CREATED) + .hasProperty("changeCount", 1) + } + } + + assertThatFeature(retrievedFeatures[1]!!) + .isIdenticalTo( + other = initialFeature, + ignoreProps = true // we ignore properties because we want to examine them later + ) + .hasPropertiesThat { retrievedProperties -> + retrievedProperties + .hasFeatureType(initialFeature.properties.featureType) + .hasXyzThat { retrievedXyz -> + retrievedXyz + .hasProperty("action", Action.UPDATED) + .hasProperty("changeCount", 2) + } + } + } +} \ No newline at end of file