Skip to content

Commit

Permalink
More extensive specs
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed Sep 20, 2024
1 parent 039cf89 commit f2efc4e
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,43 @@
package com.commercetools.queue.sqs

import cats.effect.{IO, Resource}
import cats.syntax.all._
import com.commercetools.queue.QueueClient
import com.commercetools.queue.aws.sqs.SQSClient
import com.commercetools.queue.testkit.QueueClientSuite
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region

import java.net.URI
import scala.jdk.CollectionConverters.CollectionHasAsScala

class SqsClientSuite extends QueueClientSuite {

private def config =
booleanOrDefault("AWS_SQS_USE_EMULATOR", default = true).ifM(
ifTrue =
IO.pure((Region.EU_WEST_1, AnonymousCredentialsProvider.create(), Some(new URI("http://localhost:4566")))),
ifFalse = for {
awsRegion <- string("AWS_SQS_REGION")
region <- Region
.regions()
.asScala
.find(_.id == awsRegion)
.liftTo[IO](new IllegalArgumentException(s"Cannot find any suitable AWS region from $awsRegion value!"))
credentials <- (string("AWS_SQS_ACCESS_KEY"), string("AWS_SQS_ACCESS_SECRET")).mapN((accessKey, accessSecret) =>
StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey, accessSecret)
))
} yield (region, credentials, None)
)

override def client: Resource[IO, QueueClient[IO]] =
SQSClient[IO](
Region.EU_WEST_1,
AnonymousCredentialsProvider.create(),
endpoint = Some(new URI("http://localhost:4566")))
config.toResource.flatMap { case (region, credentials, endpoint) =>
SQSClient[IO](
region,
credentials,
endpoint = endpoint
)
}

}
9 changes: 9 additions & 0 deletions azure/service-bus/integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# How to run tests

Tests are using [AzureCliCredential](https://learn.microsoft.com/en-us/java/api/com.azure.identity.azureclicredential?view=azure-java-stable).
Make sure to be assigned with the [Azure Service Bus Data Owner](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/integration#azure-service-bus-data-owner) role.

Steps:
- `az login`
- `export AZURE_SERVICEBUS_HOSTNAME=<your-servicebus-hostname>`
- `sbt "project azureServiceBusIt" test`
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.servicebus

import cats.effect.{IO, Resource}
import com.azure.identity.AzureCliCredentialBuilder
import com.commercetools.queue.QueueClient
import com.commercetools.queue.azure.servicebus.ServiceBusClient
import com.commercetools.queue.testkit.QueueClientSuite

class ServiceBusClientSuite extends QueueClientSuite {

private def config = string("AZURE_SERVICEBUS_HOSTNAME")
override val inFlightMessagesStatsSupported: Boolean = false

override def client: Resource[IO, QueueClient[IO]] =
config.toResource.flatMap { namespace =>
ServiceBusClient[IO](
namespace = namespace,
credentials = new AzureCliCredentialBuilder().build()
)
}
}
14 changes: 13 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ lazy val testkit = crossProject(JVMPlatform)
name := "fs2-queues-testkit",
libraryDependencies ++= List(
"org.scalameta" %%% "munit" % Versions.munit,
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect,
"org.slf4j" % "slf4j-simple" % "2.0.16"
)
)
.dependsOn(core)
Expand Down Expand Up @@ -138,6 +139,17 @@ lazy val azureServiceBus = crossProject(JVMPlatform)
)
.dependsOn(core, testkit % Test)

lazy val azureServiceBusIt = project
.in(file("azure/service-bus/integration"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.settings(
libraryDependencies ++= List(
"com.azure" % "azure-identity" % "1.11.1"
)
)
.dependsOn(azureServiceBus.jvm % Test, testkit.jvm % Test)

lazy val awsSQS = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("aws/sqs"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package com.commercetools.queue
import cats.effect.{MonadCancel, Resource}
import fs2.Stream

import scala.concurrent.duration.FiniteDuration

/**
* The interface to publish to a queue.
*/
Expand All @@ -28,7 +30,7 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab
def queueName: String

/**
* Returns a way to bush messages into the queue.
* Returns a way to push messages into the queue.
* This is a low-level construct, mainly aiming at integrating existing
* code bases that require to push explicitly.
*
Expand All @@ -41,10 +43,11 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab
* produced data to the queue. The messages are published in batches, according
* to the `batchSize` parameter.
*/
def sink(batchSize: Int = 10)(upstream: Stream[F, (T, Map[String, String])]): Stream[F, Nothing] =
def sink(batchSize: Int = 10, delay: Option[FiniteDuration] = None)(upstream: Stream[F, (T, Map[String, String])])
: Stream[F, Nothing] =
Stream.resource(pusher).flatMap { pusher =>
upstream.chunkN(batchSize).foreach { chunk =>
pusher.push(chunk.toList, None)
pusher.push(chunk.toList, delay)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,21 @@ import com.google.api.gax.core.NoCredentialsProvider
class PubSubClientSuite extends QueueClientSuite {

override val queueUpdateSupported = false
override val inFlightMessagesStatsSupported: Boolean = false
override val delayedMessagesStatsSupported: Boolean = false

private def config =
booleanOrDefault("GCP_PUBSUB_USE_EMULATOR", default = true).ifM(
ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("http://localhost:8042"))),
ifFalse = for {
project <- string("GCP_PUBSUB_PROJECT")
credentials = NoCredentialsProvider.create() // TODO
} yield (project, credentials, None)
)

override def client: Resource[IO, QueueClient[IO]] =
PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("http://localhost:8042"))
config.toResource.flatMap { case (project, credentials, endpoint) =>
PubSubClient(project, credentials, endpoint = endpoint)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.commercetools.queue.testkit

import com.commercetools.queue.QueueConfiguration
import munit.CatsEffectSuite

import scala.concurrent.duration._

/**
* This suite tests that the features of a [[com.commercetools.queue.QueueAdministration QueueAdministration]] are properly
* implemented for a concrete client.
*/
trait QueueAdministrationSuite extends CatsEffectSuite { self: QueueClientSuite =>

withQueue.test("existing queue should be indicated as such") { queueName =>
val client = clientFixture()
assertIO(client.administration.exists(queueName), true)
}

test("non existing queue should be indicated as such") {
val client = clientFixture()
assertIO(client.administration.exists("not-existing"), false)
}

withQueue.test("get configuration") { queueName =>
val admin = clientFixture().administration
assertIO(admin.configuration(queueName), QueueConfiguration(originalMessageTTL, originalLockTTL))
}

withQueue.test("configuration can be updated") { queueName =>
assume(queueUpdateSupported, "The test environment does not support queue update")
val admin = clientFixture().administration
for {
_ <- admin.update(queueName, Some(originalMessageTTL + 1.minute), Some(originalLockTTL + 10.seconds))
_ <- assertIO(
admin.configuration(queueName),
QueueConfiguration(originalMessageTTL + 1.minute, originalLockTTL + 10.seconds))
} yield ()
}

}
Loading

0 comments on commit f2efc4e

Please sign in to comment.