Skip to content

Commit

Permalink
Merge pull request #17 from commercetools/testkit
Browse files Browse the repository at this point in the history
Add integration tests
  • Loading branch information
satabin authored May 7, 2024
2 parents 55bda1d + 1c16601 commit 71a4b3b
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ jobs:
if: matrix.java == 'temurin@8' && steps.setup-java-temurin-8.outputs.cache-hit == 'false'
run: sbt +update

- env:
SERVICES: sqs
uses: LocalStack/setup-localstack@main
with:
image-tag: latest

- name: Check that workflows are up to date
run: sbt githubWorkflowCheck

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.sqs

import cats.effect.{IO, Resource}
import com.commercetools.queue.QueueClient
import com.commercetools.queue.aws.sqs.SQSClient
import com.commercetools.queue.testkit.QueueClientSuite
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.regions.Region

import java.net.URI

class SqsClientSuite extends QueueClientSuite {

override def client: Resource[IO, QueueClient[IO]] =
SQSClient[IO](
Region.EU_WEST_1,
AnonymousCredentialsProvider.create(),
endpoint = Some(new URI("http://localhost:4566")))

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.commercetools.queue.aws.sqs
import cats.effect.Async
import cats.syntax.functor._
import cats.syntax.monadError._
import cats.syntax.traverse._
import com.commercetools.queue.{QueuePusher, Serializer}
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest}
Expand Down Expand Up @@ -57,11 +58,12 @@ class SQSPusher[F[_], T](
SendMessageBatchRequest
.builder()
.queueUrl(queueUrl)
.entries(messages.map { message =>
.entries(messages.mapWithIndex { (message, idx) =>
SendMessageBatchRequestEntry
.builder()
.messageBody(serializer.serialize(message))
.delaySeconds(delaySeconds)
.id(idx.toString())
.build()
}.asJava)
.build())
Expand Down
33 changes: 31 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ val Scala213 = "2.13.12"
ThisBuild / crossScalaVersions := Seq(Scala213, "3.3.3")
ThisBuild / scalaVersion := Scala213

lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, circe, otel4s, unidocs)
lazy val root = tlCrossRootProject.aggregate(core, azureServiceBus, awsSQS, awsSqsIt, circe, otel4s, unidocs)

ThisBuild / tlSitePublishBranch := Some("main")

Expand All @@ -42,6 +42,29 @@ lazy val core = crossProject(JVMPlatform)
name := "fs2-queues-core"
)

lazy val testkit = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("testkit"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.settings(
name := "fs2-queues-testkit",
libraryDependencies ++= List(
"org.scalameta" %%% "munit" % Versions.munit,
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect
)
)
.dependsOn(core)

// for sqs integration test, start a localstack with sqs
ThisBuild / githubWorkflowBuildPreamble := List(
WorkflowStep.Use(
UseRef.Public(owner = "LocalStack", repo = "setup-localstack", ref = "main"),
params = Map("image-tag" -> "latest"),
env = Map("SERVICES" -> "sqs")
)
)

lazy val otel4s = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("otel4s"))
Expand Down Expand Up @@ -80,7 +103,7 @@ lazy val azureServiceBus = crossProject(JVMPlatform)
"com.azure" % "azure-messaging-servicebus" % "7.15.1"
)
)
.dependsOn(core)
.dependsOn(core, testkit % Test)

lazy val awsSQS = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
Expand All @@ -95,6 +118,12 @@ lazy val awsSQS = crossProject(JVMPlatform)
)
.dependsOn(core)

lazy val awsSqsIt = project
.in(file("aws/sqs/integration"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.dependsOn(awsSQS.jvm % Test, testkit.jvm % Test)

lazy val docs = project
.in(file("site"))
.enablePlugins(TypelevelSitePlugin)
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "3.8"

services:
localstack:
container_name: sqs
image: localstack/localstack
ports:
- "127.0.0.1:4566:4566"
environment:
- SERVICES=sqs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.commercetools.queue.testkit

import cats.effect.std.Random
import cats.effect.{IO, Ref, Resource}
import com.commercetools.queue.QueueClient
import fs2.Stream
import munit.CatsEffectSuite

import scala.concurrent.duration._

/**
* This suite tests that the basic features of a [[com.commercetools.queue.QueueClient QueueClient]] are properly
* implemented for a concrete client.
* This is used in integration tests for the various implemented queue providers.
*/
abstract class QueueClientSuite extends CatsEffectSuite {

/** Provide a way to acquire a queue client for the provider under test. */
def client: Resource[IO, QueueClient[IO]]

val clientFixture = ResourceSuiteLocalFixture("queue-client", client)

override def munitFixtures = List(clientFixture)

val withQueue =
ResourceFixture(
Resource.make(
IO.randomUUID
.map(uuid => s"queue-$uuid")
.flatTap { queueName =>
clientFixture().administration
.create(queueName, 10.minutes, 2.minutes)
})(queueName => clientFixture().administration.delete(queueName)))

withQueue.test("published messages are received by a processor") { queueName =>
for {
random <- Random.scalaUtilRandom[IO]
size <- random.nextLongBounded(30L)
messages = List.range(0L, size).map(_.toString())
received <- Ref[IO].of(List.empty[String])
client = clientFixture()
_ <- Stream
.emits(messages)
.through(client.publish(queueName).sink(batchSize = 10))
.merge(
client
.subscribe(queueName)
.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds)(msg => received.update(msg.payload :: _))
.take(size)
)
.compile
.drain
_ <- assertIO(received.get.map(_.toSet), messages.toSet)
} yield ()
}

}

0 comments on commit 71a4b3b

Please sign in to comment.