Skip to content

Commit

Permalink
partitioning test
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhead-pl committed Sep 27, 2024
1 parent 400303b commit fd79b75
Showing 1 changed file with 140 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package naksha.psql

import naksha.base.Epoch
import naksha.model.SessionOptions
import naksha.model.objects.NakshaCollection
import naksha.model.objects.NakshaFeature
import naksha.model.request.SuccessResponse
import naksha.model.request.Write
import naksha.model.request.WriteRequest
import naksha.psql.base.PgTestBase
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertTrue

class PartitioningTest : PgTestBase() {

@Test
fun createCollectionWithPartitions() {
// given
val numberOfPartitions = 8
val partitionedCollection = NakshaCollection(
id = "feature_partitioned",
partitions = numberOfPartitions
)
val writeOp = Write().createCollection(map = null, collection = partitionedCollection)
val writeRequest = WriteRequest().add(writeOp)

// when
storage.newWriteSession().use { session ->
session.execute(writeRequest)
session.commit()
}

// then
val createdPartitions = queryForTablePartitions(partitionedCollection.id)
assertEquals(numberOfPartitions, createdPartitions.size)
for ((idx, createdPartition) in createdPartitions.withIndex()) {
// "feature_partitioned$p000", "feature_partitioned$p001",...
val expectedPartitionTableName = "\"feature_partitioned\$p${PgUtil.partitionPosix(idx)}\""
assertEquals(expectedPartitionTableName, createdPartition)
}

// also: check history partitioning
val hstTable = "${partitionedCollection.id}${PG_HST}"
val createdHstPartitions = queryForTablePartitions(hstTable)
// first current year partition: like "feature_partitioned$hst$y2025"
assertEquals("\"feature_partitioned\$hst\$y${Epoch().year}\"", createdHstPartitions[0])
// next year
assertEquals("\"feature_partitioned\$hst\$y${Epoch().year+1}\"", createdHstPartitions[1])
for (hstPartition in createdHstPartitions) {
val rawHstName = hstPartition.replace("\"", "")
val createdHstSubPartitions = queryForTablePartitions(rawHstName)
assertEquals(numberOfPartitions, createdHstSubPartitions.size)
for ((idx, createdPartition) in createdHstSubPartitions.withIndex()) {
// "feature_partitioned$hst$y2025$p001", ...
val expectedPartitionTableName = "\"$rawHstName\$p${PgUtil.partitionPosix(idx)}\""
assertEquals(expectedPartitionTableName, createdPartition)
}
}
}

@Test
fun shouldInsertToSpecificPartition() {
// given
val numberOfPartitions = 2
val partitionedCollection = NakshaCollection(
id = "feature_partitioned_insert_check",
partitions = numberOfPartitions
)
val writeOp = Write().createCollection(map = null, collection = partitionedCollection)
val writeRequest = WriteRequest().add(writeOp)
storage.newWriteSession().use { session ->
session.execute(writeRequest)
session.commit()
}

// when
val writeFeatureOp = Write().createFeature(null, partitionedCollection.id, NakshaFeature("f1"))
val writeFeatureRequest = WriteRequest().add(writeFeatureOp)
storage.newWriteSession().use { session ->
val result = session.execute(writeFeatureRequest)
session.commit()

// then
// feature should be successfully stored
assertTrue { result is SuccessResponse }
assertEquals(1, (result as SuccessResponse).tuples.size)
}
}

@Test
fun shouldNotAllowZeroPartitions() {
// given
val numberOfPartitions = 0
val partitionedCollection = NakshaCollection(
id = "zero_partitions",
partitions = numberOfPartitions
)
val writeOp = Write().createCollection(map = null, collection = partitionedCollection)
val writeRequest = WriteRequest().add(writeOp)

// when
storage.newWriteSession().use { session ->
// expect
assertFails("Invalid amount of partitions requested, must be 1 to 256, was: 0") { session.execute(writeRequest) }
}
}

@Test
fun shouldNotAllowMoreThan256Partitions() {
// given
val numberOfPartitions = 257
val partitionedCollection = NakshaCollection(
id = "to_many_partitions",
partitions = numberOfPartitions
)
val writeOp = Write().createCollection(map = null, collection = partitionedCollection)
val writeRequest = WriteRequest().add(writeOp)

// when
storage.newWriteSession().use { session ->
// expect
assertFails("Invalid amount of partitions requested, must be 1 to 256, was: 0") { session.execute(writeRequest) }
}
}

private fun queryForTablePartitions(table: String): List<String> {
val pgConnection = storage.newConnection(SessionOptions.from(null), true)
val cursor = pgConnection.execute(
"SELECT inhrelid::regclass AS partitioned_table FROM pg_inherits WHERE inhparent = $1::regclass order by partitioned_table",
arrayOf(table)
)
val result = mutableListOf<String>()
while (cursor.next()) {
result.add(cursor.column("partitioned_table").toString())
}
return result
}
}

0 comments on commit fd79b75

Please sign in to comment.