Skip to content

Commit

Permalink
add support for amazon sqs DeleteMessageBatch action
Browse files Browse the repository at this point in the history
  • Loading branch information
oharaandrew314 committed Jul 22, 2023
1 parent 1fac282 commit 75401b6
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.http4k.connect.amazon.sqs.action

import dev.forkhandles.result4k.Failure
import dev.forkhandles.result4k.Result4k
import dev.forkhandles.result4k.Success
import org.http4k.connect.Http4kConnectAction
import org.http4k.connect.RemoteFailure
import org.http4k.connect.amazon.core.firstChildText
import org.http4k.connect.amazon.core.xmlDoc
import org.http4k.connect.amazon.sqs.SQSAction
import org.http4k.connect.amazon.sqs.model.ReceiptHandle
import org.http4k.connect.amazon.sqs.model.SQSMessageId
import org.http4k.connect.asRemoteFailure
import org.http4k.core.Response
import org.http4k.core.Uri

@Http4kConnectAction
data class DeleteMessageBatch(
val queueUrl: Uri,
val entries: List<DeleteMessageBatchEntry>,
) : SQSAction<List<SQSMessageId>>(
"DeleteMessageBatch",
*entries.withIndex().flatMap { (index, entry) -> entry.toMappings(index + 1) }.toTypedArray(),
"QueueUrl" to queueUrl.toString()
) {
override fun toResult(response: Response): Result4k<List<SQSMessageId>, RemoteFailure> = with(response) {
when {
status.successful -> {
val entries = xmlDoc().getElementsByTagName("DeleteMessageBatchResultEntry")
(0 until entries.length)
.map { entries.item(it) }
.map { SQSMessageId.of(it.firstChildText("Id")!!) }
.let { Success(it) }
}
else -> Failure(asRemoteFailure(this))
}
}
}

typealias DeleteMessageBatchEntry = Pair<SQSMessageId, ReceiptHandle>

private fun DeleteMessageBatchEntry.toMappings(index: Int) = listOf(
"DeleteMessageBatchRequestEntry.$index.Id" to first.value,
"DeleteMessageBatchRequestEntry.$index.ReceiptHandle" to second.value
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package org.http4k.connect.amazon.sqs

import com.natpryce.hamkrest.assertion.assertThat
import com.natpryce.hamkrest.equalTo
import com.natpryce.hamkrest.hasSize
import org.http4k.connect.amazon.AwsContract
import org.http4k.connect.amazon.core.model.DataType
import org.http4k.connect.amazon.core.model.Tag
import org.http4k.connect.amazon.sqs.model.MessageAttribute
import org.http4k.connect.amazon.sqs.model.MessageSystemAttribute
import org.http4k.connect.amazon.sqs.model.QueueName
import org.http4k.connect.amazon.sqs.model.ReceiptHandle
import org.http4k.connect.amazon.sqs.model.SQSMessageId
import org.http4k.connect.model.Base64Blob
import org.http4k.connect.successValue
import org.http4k.core.HttpHandler
Expand Down Expand Up @@ -99,4 +102,34 @@ abstract class SQSContract(http: HttpHandler) : AwsContract() {
}

open fun waitABit() {}

@Test
fun `delete batch`() {
val created = sqs.createQueue(queueName, emptyList(), emptyMap()).successValue()
try {
sqs.sendMessage(created.QueueUrl, "foo").successValue()
sqs.sendMessage(created.QueueUrl, "bar").successValue()
sqs.sendMessage(created.QueueUrl, "baz").successValue()

val (message1, message2) = sqs.receiveMessage(queueUrl = created.QueueUrl, maxNumberOfMessages = 2)
.successValue()

// delete batch
val result = sqs.deleteMessageBatch(
queueUrl = created.QueueUrl,
entries = listOf(
message1.messageId to message1.receiptHandle,
message2.messageId to message2.receiptHandle,
SQSMessageId.of("1337") to ReceiptHandle.of("sdckjdsklfjdsf")
)
).successValue()
assertThat(result, equalTo(listOf(message1.messageId, message2.messageId)))

// ensure messages deleted
assertThat(sqs.receiveMessage(created.QueueUrl).successValue(), hasSize(equalTo(1)))

} finally {
sqs.deleteQueue(created.QueueUrl).successValue()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class FakeSQS(
"/" bind POST to routes(
deleteMessage(queues),
deleteQueue(queues),
deleteMessageBatch(queues),
receiveMessage(queues),
createQueue(queues, awsAccount),
getQueueAttributes(queues),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,36 @@ fun deleteMessage(queues: Storage<List<SQSMessage>>) = { r: Request -> r.form("A
?: Response(BAD_REQUEST).body("Queue named $queue not found")
}

fun deleteMessageBatch(queues: Storage<List<SQSMessage>>) = { r: Request -> r.form("Action") == "DeleteMessageBatch" }
.asRouter().bind fn@{ req: Request ->
val queueName = req.form("QueueUrl")!!.queueName()
val queue = queues[queueName] ?: return@fn Response(BAD_REQUEST).body("Queue named $queueName not found")

val messages = (1 until Int.MAX_VALUE)
.asSequence()
.map { index ->
val id = req
.form("DeleteMessageBatchRequestEntry.$index.Id")?.let(SQSMessageId::of)
?: return@map null
val handle = req
.form("DeleteMessageBatchRequestEntry.$index.ReceiptHandle")?.let(ReceiptHandle::of)
?: return@map null

id to handle
}
.takeWhile { it != null }
.filterNotNull()
.mapNotNull { (id, handle) -> queue.find { it.messageId == id && it.receiptHandle == handle } }
.toSet()

queues[queueName] = queue - messages

val result = DeleteMessageBatchResponse(
entries = messages.map { DeleteMessageBatchResultEntry(it.messageId) }
)
Response(OK).with(viewModelLens of result)
}

val viewModelLens by lazy {
Body.viewModel(HandlebarsTemplates().CachingClasspath(), ContentType.APPLICATION_XML).toLens()
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ object DeleteQueueResponse : ViewModel

object DeleteMessageResponse : ViewModel

data class DeleteMessageBatchResultEntry(val id: SQSMessageId)
data class DeleteMessageBatchResponse(val entries: List<DeleteMessageBatchResultEntry>): ViewModel

fun SQSMessage.md5OfBody() = body.md5()

fun String.md5() = BigInteger(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0"?>
<DeleteMessageBatchResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<DeleteMessageBatchResult>
{{#each entries}}
<DeleteMessageBatchResultEntry>
<Id>{{id}}</Id>
</DeleteMessageBatchResultEntry>
{{/each}}
</DeleteMessageBatchResult>
<ResponseMetadata>
<RequestId>11111111-1111-1111-1111-111111111111</RequestId>
</ResponseMetadata>
</DeleteMessageBatchResponse>

0 comments on commit 75401b6

Please sign in to comment.