Skip to content

Commit

Permalink
Add queryPage and scanPage to DynamoDb table mapper (#269)
Browse files Browse the repository at this point in the history
* Add queryPage and scanPage operations to DynamoDb table mapper

* changelog

* changelog

* changelog

* fix test fixture

* add additional test coverage
  • Loading branch information
oharaandrew314 authored Jul 15, 2023
1 parent a6fe5d4 commit 5109642
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 19 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
This list is not intended to be all-encompassing - it will document major and breaking API
changes with their rationale when appropriate. Given version `A.B.C.D`, breaking changes are to be expected in version number increments where changes in the `A` or `B` sections:

### v5.1.3.0 (Uncut)
- **http4k-connect-amazon-dynamodb** - Add scanPage and queryPage operations to DynamoDb table mapper. Pagination can now be controlled by the caller. H/T @oharaandrew314
- **http4k-connect-amazon-dynamodb-fake** - [Fix] query and scan will now return the correct LastEvaluatedKey based on the current index. H/T @oharaandrew314

### v5.1.2.0
- **http4k-connect-amazon-eventbridge** - [New module] Adapter and fake implementation.

Expand All @@ -11,7 +15,8 @@ changes with their rationale when appropriate. Given version `A.B.C.D`, breaking

### v5.1.0.0
- **http4k-connect-*** - Upgrade dependencies, including Kotlin to 1.9.0.
- **http4k-connect-amazon-kms-fake** - Vend unique rsa ecdsa keypairs. H/T @oharaandrew314
- **http4k-connect-amazon-kms-fake** - Will now generate unique key pairs for each CMK. H/T @oharaandrew314
- **http4k-connect-amazon-kms-fake** - [Fix] Getting the public key for an ECDSA CMK will now work as expected. H/T @oharaandrew314

### v5.0.1.0
- **http4k-connect-*** - Upgrade dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ class DynamoDbIndexMapper<Document: Any, HashKey: Any, SortKey: Any>(
}.map(itemLens)
}

fun scanPage(
filter: String? = null,
names: Map<String, AttributeName>? = null,
values: Map<String, AttributeValue>? = null,
exclusiveStartKey: Pair<HashKey, SortKey?>? = null,
limit: Int? = null
): DynamoDbPage<Document, HashKey, SortKey> {
val page = dynamoDb.scan(
TableName = tableName,
IndexName = schema.indexName,
FilterExpression = filter,
ExpressionAttributeNames = names,
ExpressionAttributeValues = values,
ExclusiveStartKey = exclusiveStartKey?.let { schema.key(exclusiveStartKey.first, exclusiveStartKey.second) },
Limit = limit
).onFailure { it.reason.throwIt() }

return DynamoDbPage(
items = page.items.map(itemLens),
nextHashKey = page.LastEvaluatedKey?.let { schema.hashKeyAttribute[it] },
nextSortKey = page.LastEvaluatedKey?.let { key -> schema.sortKeyAttribute?.let { attr -> attr[key] } }
)
}

fun query(
filter: String? = null,
names: Map<String, AttributeName>? = null,
Expand All @@ -51,4 +75,28 @@ class DynamoDbIndexMapper<Document: Any, HashKey: Any, SortKey: Any>(
scanIndexForward = scanIndexForward
)
}

fun queryPage(
hashKey: HashKey,
scanIndexForward: Boolean = true,
exclusiveStartKey: SortKey? = null,
limit: Int? = null
): DynamoDbPage<Document, HashKey, SortKey> {
val page = dynamoDb.query(
TableName = tableName,
IndexName = schema.indexName,
KeyConditionExpression = "#key1 = :val1",
ExpressionAttributeNames = mapOf("#key1" to schema.hashKeyAttribute.name),
ExpressionAttributeValues = mapOf(":val1" to schema.hashKeyAttribute.asValue(hashKey)),
ScanIndexForward = scanIndexForward,
ExclusiveStartKey = exclusiveStartKey?.let { schema.key(hashKey, exclusiveStartKey) },
Limit = limit
).onFailure { it.reason.throwIt() }

return DynamoDbPage(
items = page.items.map(itemLens),
nextHashKey = page.LastEvaluatedKey?.let { schema.hashKeyAttribute[it] },
nextSortKey = page.LastEvaluatedKey?.let { key -> schema.sortKeyAttribute?.let { attr -> attr[key] } }
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.http4k.connect.amazon.dynamodb.mapper

data class DynamoDbPage<Document: Any, HashKey: Any, SortKey: Any>(
val items: List<Document>,
val nextHashKey: HashKey?,
val nextSortKey: SortKey?
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.http4k.connect.amazon.dynamodb.model.TableDescription
data class DynamoTable(val table: TableDescription, val items: List<Item> = emptyList(), val maxPageSize: Int = 1_000) {
fun retrieve(key: Key) = items.firstOrNull { it.matches(key) }

fun withItem(item: Item) = retrieve(item.key(table))
fun withItem(item: Item) = retrieve(item.key(table.KeySchema!!))
.let { existing -> if (existing != null) withoutItem(existing) else this }
.let { it.copy(items = it.items + item) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ fun TableDescription.keySchema(indexName: IndexName? = null): List<KeySchema>? {
?.KeySchema
}

fun Item.key(table: TableDescription): Key {
val schema = table.keySchema() ?: error("No key defined!")

fun Item.key(schema: List<KeySchema>): Key {
return schema.mapNotNull { key ->
val value = this[key.AttributeName]
if (value == null) null else key.AttributeName to value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import org.http4k.connect.storage.Storage

fun AmazonJsonFake.query(tables: Storage<DynamoTable>) = route<Query> { query ->
val table = tables[query.TableName.value] ?: return@route null
val schema = table.table.keySchema(query.IndexName)

val comparator = table.table.keySchema(query.IndexName)
.comparator(query.ScanIndexForward ?: true)
val comparator = schema.comparator(query.ScanIndexForward ?: true)

val matches = table.items
.asSequence()
Expand All @@ -33,8 +33,8 @@ fun AmazonJsonFake.query(tables: Storage<DynamoTable>) = route<Query> { query ->
QueryResponse(
Count = page.size,
Items = page.map { it.asItemResult() },
LastEvaluatedKey = if (page.size < matches.size) {
page.lastOrNull()?.key(table.table)
LastEvaluatedKey = if (page.size < matches.size && schema != null) {
page.lastOrNull()?.key(schema)
} else null
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import org.http4k.connect.storage.Storage

fun AmazonJsonFake.scan(tables: Storage<DynamoTable>) = route<Scan> { scan ->
val table = tables[scan.TableName.value] ?: return@route null
val schema = table.table.keySchema(scan.IndexName)

val comparator = table.table.keySchema(scan.IndexName).comparator(true)
val comparator = schema.comparator(true)

val matches = table.items
.asSequence()
Expand All @@ -29,8 +30,8 @@ fun AmazonJsonFake.scan(tables: Storage<DynamoTable>) = route<Scan> { scan ->
ScanResponse(
Count = page.size,
Items = page.map { it.asItemResult() },
LastEvaluatedKey = if (page.size < matches.size) {
page.lastOrNull()?.key(table.table)
LastEvaluatedKey = if (page.size < matches.size && schema != null) {
page.lastOrNull()?.key(schema)
} else null
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,32 @@ abstract class DynamoDbQueryContract: DynamoDbSource {
assertThat(result.LastEvaluatedKey, absent())
}

@Test
fun `query by index - with limit`() {
dynamo.putItem(table, hash1Val1)
dynamo.putItem(table, hash1Val2)
dynamo.putItem(table, hash2Val1)

val result = dynamo.query(
TableName = table,
IndexName = numbersIndex,
KeyConditionExpression = "$attrN = :val1",
ExpressionAttributeValues = mapOf(":val1" to attrN.asValue(1)),
ScanIndexForward = false,
Limit = 1
).successValue()

assertThat(result.Count, equalTo(1))
assertThat(result.items, equalTo(listOf(
hash2Val1,
)))
// ensure next key matches current index
assertThat(result.LastEvaluatedKey, equalTo(mapOf(
attrN.name to hash2Val1[attrN.name]!!,
attrS.name to hash2Val1[attrS.name]!!
)))
}

@Test
fun `query with max results for page`() {
val numItems = 2_000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,17 @@ internal val bandit = Cat(
name = "Bandit",
born = LocalDate.of(2018, 7, 1)
)

internal val kratos = Cat(
ownerId = owner1,
id = UUID.fromString("bf191fc3-b660-477b-8b9f-fc4a856f20af"),
name = "Kratos",
born = LocalDate.of(2022, 9, 4)
)

internal val athena = Cat(
ownerId = owner1,
id = UUID.fromString("fb153ca2-e241-41df-9fe2-8858c6f6d1b0"),
name = "Athena",
born = LocalDate.of(2022, 9, 4)
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DynamoDbTableMapperTest {

init {
tableMapper.createTable(byOwner, byDob)
tableMapper += listOf(toggles, smokie, bandit)
tableMapper += listOf(toggles, smokie, bandit, kratos, athena)
}

private fun table() = storage["cats"]!!
Expand Down Expand Up @@ -70,7 +70,7 @@ class DynamoDbTableMapperTest {
fun `scan table`() {
assertThat(
tableMapper.primaryIndex().scan().toSet(),
equalTo(setOf(toggles, smokie, bandit))
equalTo(setOf(toggles, smokie, bandit, kratos, athena))
)
}

Expand Down Expand Up @@ -104,35 +104,35 @@ class DynamoDbTableMapperTest {
fun `delete item`() {
tableMapper -= toggles

assertThat(table().items, hasSize(equalTo(2)))
assertThat(table().items, hasSize(equalTo(4)))
}

@Test
fun `delete missing item`() {
tableMapper.delete(UUID.randomUUID())

assertThat(table().items, hasSize(equalTo(3)))
assertThat(table().items, hasSize(equalTo(5)))
}

@Test
fun `delete batch`() {
tableMapper -= listOf(smokie, bandit)

assertThat(table().items, hasSize(equalTo(1)))
assertThat(table().items, hasSize(equalTo(3)))
}

@Test
fun `delete batch by ids`() {
tableMapper.batchDelete(smokie.id, bandit.id)

assertThat(table().items, hasSize(equalTo(1)))
assertThat(table().items, hasSize(equalTo(3)))
}

@Test
fun `delete batch by keys`() {
tableMapper.batchDelete(listOf(smokie.id to null, bandit.id to null))

assertThat(table().items, hasSize(equalTo(1)))
assertThat(table().items, hasSize(equalTo(3)))
}

@Test
Expand All @@ -151,6 +151,57 @@ class DynamoDbTableMapperTest {
assertThat(results, equalTo(listOf(smokie, bandit)))
}

@Test
fun `query page`() {
// page 1 of 2
assertThat(tableMapper.index(byOwner).queryPage(owner1, limit = 2), equalTo(DynamoDbPage(
items = listOf(athena, kratos),
nextHashKey = owner1,
nextSortKey = kratos.name
)))

// page 2 of 2
assertThat(tableMapper.index(byOwner).queryPage(owner1, limit = 2, exclusiveStartKey = kratos.name), equalTo(DynamoDbPage(
items = listOf(toggles),
nextHashKey = null,
nextSortKey = null
)))
}

@Test
fun `scan page - secondary index`() {
// page 1 of 1
assertThat(tableMapper.index(byOwner).scanPage(limit = 3), equalTo(DynamoDbPage(
items = listOf(bandit, smokie, athena),
nextHashKey = owner1,
nextSortKey = athena.name
)))

// page 2 of 2
assertThat(tableMapper.index(byOwner).scanPage(limit = 3, exclusiveStartKey = owner1 to athena.name), equalTo(DynamoDbPage(
items = listOf(kratos, toggles),
nextHashKey = null,
nextSortKey = null
)))
}

@Test
fun `scan page - primary index with no sort key`() {
// page 1 of 1
assertThat(tableMapper.primaryIndex().scanPage(limit = 3), equalTo(DynamoDbPage(
items = listOf(smokie, kratos, bandit),
nextHashKey = bandit.id,
nextSortKey = null
)))

// page 2 of 2
assertThat(tableMapper.primaryIndex().scanPage(limit = 3, exclusiveStartKey = bandit.id to null), equalTo(DynamoDbPage(
items = listOf(toggles, athena),
nextHashKey = null,
nextSortKey = null
)))
}

@Test
fun `get empty batch`() {
val batchGetResult = tableMapper.batchGet(emptyList()).toList()
Expand Down

0 comments on commit 5109642

Please sign in to comment.