Skip to content

Commit

Permalink
upsert + bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhead-pl committed Sep 17, 2024
1 parent 0f592b0 commit a531b45
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package naksha.psql

import naksha.base.JsEnum
import naksha.model.quoteId
import naksha.model.request.query.TupleColumn
import kotlin.js.JsExport
import kotlin.js.JsStatic
Expand Down Expand Up @@ -76,6 +77,8 @@ class PgColumn : JsEnum() {
}
private set

fun quoted() = quoteId(this.name)

companion object PgColumnCompanion {
/**
* Returns the columns instance for the given name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ import naksha.model.objects.NakshaCollection
import naksha.model.objects.NakshaFeature
import naksha.model.request.*
import naksha.psql.*
import naksha.psql.executors.write.DeleteFeature
import naksha.psql.executors.write.InsertFeature
import naksha.psql.executors.write.UpdateFeature
import naksha.psql.executors.write.WriteExecutor
import naksha.psql.executors.write.*
import kotlin.jvm.JvmField

// TODO: We need to fix NakshaBulkLoaderPlan to make this faster again !
Expand Down Expand Up @@ -185,6 +182,8 @@ class PgWriter(
val tuples = this.tuples
val tupleNumbers = this.tupleNumbers

val previousMetadataProvider = ExisingMetadataProvider(session, orderedWrites)

// First, process collections, no performance need here for now.
for (write in orderedWrites) {
if (write == null) continue
Expand All @@ -201,12 +200,18 @@ class PgWriter(
)
}
} else {
val collection = collectionOf(write)
when (write.op) {
WriteOp.CREATE -> InsertFeature(this, writeExecutor).execute(collectionOf(write), write)
WriteOp.UPSERT -> returnTuple(write, upsertFeature(collectionOf(write), write))
WriteOp.UPDATE -> UpdateFeature(this).execute(collectionOf(write), write)
WriteOp.DELETE -> DeleteFeature(this).execute(collectionOf(write), write)
WriteOp.PURGE -> returnTuple(write, purgeFeature(collectionOf(write), write))
WriteOp.CREATE -> InsertFeature(this, writeExecutor).execute(collection, write)
WriteOp.UPSERT ->
if (write.id == null || previousMetadataProvider.get(collection.head.name, write.id!!) == null) {
InsertFeature(this, writeExecutor).execute(collection, write)
} else {
UpdateFeature(this, previousMetadataProvider, writeExecutor).execute(collection, write)
}
WriteOp.UPDATE -> UpdateFeature(this, previousMetadataProvider, writeExecutor).execute(collection, write)
WriteOp.DELETE -> DeleteFeature(this, previousMetadataProvider, writeExecutor).execute(collection, write)
WriteOp.PURGE -> TODO()
else -> throw NakshaException(
UNSUPPORTED_OPERATION,
"Unknown write-operation: '${write.op}'"
Expand Down Expand Up @@ -393,12 +398,4 @@ class PgWriter(
).close()
return tuple
}

internal fun upsertFeature(collection: PgCollection, write: WriteExt): Tuple {
TODO("Implement me")
}

internal fun purgeFeature(collection: PgCollection, write: WriteExt): Tuple {
TODO("Implement me")
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package naksha.psql.executors.write

import naksha.model.*
import naksha.model.Naksha.NakshaCompanion.quoteIdent
import naksha.model.Tuple
import naksha.model.objects.NakshaFeature
import naksha.psql.PgCollection
import naksha.psql.PgColumn
Expand All @@ -11,17 +11,21 @@ import naksha.psql.executors.write.WriteFeatureUtils.allColumnValues

class BulkWriteExecutor(
val session: PgSession,
): WriteExecutor {
) : WriteExecutor {

private var deleteFromDel: MutableMap<PgCollection, PgPlan> = mutableMapOf()
private var insertToHead: MutableMap<PgCollection, PgPlan> = mutableMapOf()
private var updateHead: MutableMap<PgCollection, PgPlan> = mutableMapOf()
private var copyHeadToHst: MutableMap<PgCollection, PgPlan> = mutableMapOf()
private var copyHeadToDel: MutableMap<PgCollection, PgPlan> = mutableMapOf()

override fun removeFeatureFromDel(collection: PgCollection, featureId: String) {
if (!deleteFromDel.containsKey(collection)) {
collection.deleted?.let { delTable ->
val quotedDelTable = quoteIdent(delTable.name)
val quotedIdColumn = quoteIdent(PgColumn.id.name)
val plan = session.usePgConnection().prepare("DELETE FROM $quotedDelTable WHERE $quotedIdColumn=$1", arrayOf(PgColumn.id.type.text))
val plan = session.usePgConnection()
.prepare("DELETE FROM $quotedDelTable WHERE $quotedIdColumn=$1", arrayOf(PgColumn.id.type.text))
deleteFromDel[collection] = plan
}
}
Expand All @@ -46,14 +50,130 @@ class BulkWriteExecutor(
insertToHead[collection]!!.addBatch(allColumnValues(tuple = tuple, feature = feature, txn = session.transaction().txn))
}


// hst table
// txn_next txn uid flags
// CREATED/UPDATED new (1) old unchanged from head unchanged from head
// DELETED new (1) new (1) new with deleted action bits
// (1) denotes the same value, taken from current txn / version of current PgSession
/**
* Persist the feature entry in HEAD table into the destination table (HST or DEL).
* @param tupleNumber if intend to insert a tombstone DELETED state, provide this tuple number
* of the tombstone state, with new uid and current session txn
* @param flags the new flags. If intend to insert a tombstone DELETED state,
* provide the old flags but with action DELETED.
*/
override fun copyHeadToHst(
collection: PgCollection,
tupleNumber: TupleNumber?,
flags: Flags?,
featureId: String
) {
if (!copyHeadToHst.containsKey(collection)) {
copyHeadToHst[collection] = createCopyPlan(collection.head.quotedName, collection.history!!.quotedName)
}
copyHeadToHst[collection]!!.addBatch(
args = arrayOf(
session.transaction().txn,
tupleNumber?.version?.txn,
tupleNumber?.uid,
flags,
featureId
)
)
}

override fun copyHeadToDel(collection: PgCollection, tupleNumber: TupleNumber?, flags: Flags?, featureId: String) {
if (!copyHeadToDel.containsKey(collection)) {
copyHeadToDel[collection] = createCopyPlan(collection.head.quotedName, collection.deleted!!.quotedName)
}
copyHeadToDel[collection]!!.addBatch(
args = arrayOf(
session.transaction().txn,
tupleNumber?.version?.txn,
tupleNumber?.uid,
flags,
featureId
)
)
}

override fun updateFeatureInHead(
collection: PgCollection,
tuple: Tuple,
feature: NakshaFeature,
newVersion: Version,
previousMetadata: Metadata
) {
if (!updateHead.containsKey(collection)) {
val columnEqualsVariable = PgColumn.allWritableColumns.mapIndexed { index, pgColumn ->
"${pgColumn.name}=\$${index + 1}"
}.joinToString(separator = ",")
val quotedHeadTable = collection.head.quotedName

val conn = session.usePgConnection()
val plan = conn.prepare(
sql = """ UPDATE $quotedHeadTable
SET $columnEqualsVariable
WHERE ${PgColumn.id.quoted()}=$${PgColumn.allWritableColumns.size + 1}
""".trimIndent(),
PgColumn.allWritableColumns.map { it.type.text }.toTypedArray()
)
updateHead[collection] = plan
}
updateHead[collection]!!.addBatch(
args = allColumnValues(
tuple = tuple,
feature = feature,
txn = newVersion.txn,
prevTxn = previousMetadata.version.txn,
prevUid = previousMetadata.uid,
changeCount = previousMetadata.changeCount + 1
).plus(feature.id)
)
}

override fun finish() {
deleteFromDel.forEach {
it.value.executeBatch()
it.value.close()
it.value.use { stmt -> stmt.executeBatch() }
}
copyHeadToHst.forEach {
it.value.use { stmt -> stmt.executeBatch() }
}
copyHeadToDel.forEach {
it.value.use { stmt -> stmt.executeBatch() }
}
updateHead.forEach {
it.value.use { stmt -> stmt.executeBatch() }
}
insertToHead.forEach {
it.value.executeBatch()
it.value.close()
it.value.use { stmt -> stmt.executeBatch() }
}

}

private fun createCopyPlan(headTableName: String, dstTableName: String): PgPlan {

val columnsToOverride = mutableListOf(PgColumn.txn_next, PgColumn.txn, PgColumn.uid, PgColumn.flags)
val columnsToCopy = PgColumn.allWritableColumns.minus(columnsToOverride.toSet())
val columns = mutableListOf<PgColumn>()
columns.addAll(columnsToOverride)
columns.addAll(columnsToCopy)

val columnNames = columns.joinToString(separator = ",")
val copyColumnNames = columnsToCopy.joinToString(separator = ",")

return session.usePgConnection().prepare(
sql = """
INSERT INTO $dstTableName($columnNames)
SELECT $1,
COALESCE($2, ${PgColumn.txn}),
COALESCE($3, ${PgColumn.uid}),
COALESCE($4, ${PgColumn.flags}),
$copyColumnNames FROM $headTableName
WHERE ${PgColumn.id.quoted()} = $5
""".trimIndent(),
columns.map { it.type.text }.toTypedArray()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import naksha.model.*
import naksha.model.request.ReadFeatures
import naksha.model.request.SuccessResponse
import naksha.psql.PgCollection
import naksha.psql.PgColumn
import naksha.psql.PgUtil.PgUtilCompanion.quoteIdent
import naksha.psql.executors.PgReader
import naksha.psql.executors.PgWriter
Expand All @@ -12,8 +13,10 @@ import naksha.psql.executors.write.WriteFeatureUtils.newFeatureTupleNumber
import naksha.psql.executors.write.WriteFeatureUtils.resolveFlags

class DeleteFeature(
writer: PgWriter
) : UpdateFeature(writer) {
writer: PgWriter,
exisingMetadataProvider: ExisingMetadataProvider,
writeExecutor: WriteExecutor
) : UpdateFeature(writer, exisingMetadataProvider, writeExecutor) {
override fun execute(collection: PgCollection, write: WriteExt): TupleNumber {
val featureId = write.featureId ?: throw NakshaException(NakshaError.ILLEGAL_ARGUMENT, "No feature ID provided")

Expand All @@ -29,15 +32,10 @@ class DeleteFeature(
// If hst table enabled
collection.history?.let { hstTable ->
// copy head state into hst with txn_next === txn
insertHeadToTable(
destinationTable = hstTable,
headTable = collection.head,
featureId = featureId
)
writeExecutor.copyHeadToHst(collection = collection, featureId = featureId)
// also copy head state into hst with txn_next === txn and action DELETED as a tombstone state
insertHeadToTable(
destinationTable = hstTable,
headTable = collection.head,
writeExecutor.copyHeadToHst(
collection = collection,
tupleNumber = tupleNumber,
flags = flags,
featureId = featureId,
Expand All @@ -46,9 +44,8 @@ class DeleteFeature(

// If del table enabled, copy head state into del, with action DELETED and txn_next === txn as a tombstone state
collection.deleted?.let { delTable ->
insertHeadToTable(
destinationTable = delTable,
headTable = collection.head,
writeExecutor.copyHeadToDel(
collection = collection,
tupleNumber = tupleNumber,
flags = flags,
featureId = featureId,
Expand All @@ -66,7 +63,7 @@ class DeleteFeature(
val quotedHeadTable = quoteIdent(headTable.name)
session.usePgConnection()
.execute(
sql = "DELETE FROM $quotedHeadTable WHERE $quotedIdColumn=$1",
sql = "DELETE FROM $quotedHeadTable WHERE ${PgColumn.id.quoted()}=$1",
args = arrayOf(featureId)
).close()
}
Expand Down
Loading

0 comments on commit a531b45

Please sign in to comment.