Skip to content

Commit

Permalink
Merge branch 'main' into update/google-cloud-monitoring-3.50.0
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z authored Sep 23, 2024
2 parents 6e0ef6f + 3e2ea67 commit 6728d07
Show file tree
Hide file tree
Showing 22 changed files with 588 additions and 227 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ jobs:
runs-on: ${{ matrix.os }}
timeout-minutes: 60
steps:
- name: Install sbt
if: contains(runner.os, 'macos')
run: brew install sbt

- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -114,6 +118,10 @@ jobs:
java: [temurin@8]
runs-on: ${{ matrix.os }}
steps:
- name: Install sbt
if: contains(runner.os, 'macos')
run: brew install sbt

- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -184,6 +192,10 @@ jobs:
java: [temurin@11]
runs-on: ${{ matrix.os }}
steps:
- name: Install sbt
if: contains(runner.os, 'macos')
run: brew install sbt

- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.7.17"
version = "3.8.3"
runner.dialect = scala213source3

maxColumn = 120
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,43 @@
package com.commercetools.queue.sqs

import cats.effect.{IO, Resource}
import cats.syntax.all._
import com.commercetools.queue.QueueClient
import com.commercetools.queue.aws.sqs.SQSClient
import com.commercetools.queue.testkit.QueueClientSuite
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
import software.amazon.awssdk.auth.credentials.{AnonymousCredentialsProvider, AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region

import java.net.URI
import scala.jdk.CollectionConverters.CollectionHasAsScala

class SqsClientSuite extends QueueClientSuite {

private def config =
booleanOrDefault("AWS_SQS_USE_EMULATOR", default = true).ifM(
ifTrue =
IO.pure((Region.EU_WEST_1, AnonymousCredentialsProvider.create(), Some(new URI("http://localhost:4566")))),
ifFalse = for {
awsRegion <- string("AWS_SQS_REGION")
region <- Region
.regions()
.asScala
.find(_.id == awsRegion)
.liftTo[IO](new IllegalArgumentException(s"Cannot find any suitable AWS region from $awsRegion value!"))
credentials <- (string("AWS_SQS_ACCESS_KEY"), string("AWS_SQS_ACCESS_SECRET")).mapN((accessKey, accessSecret) =>
StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey, accessSecret)
))
} yield (region, credentials, None)
)

override def client: Resource[IO, QueueClient[IO]] =
SQSClient[IO](
Region.EU_WEST_1,
AnonymousCredentialsProvider.create(),
endpoint = Some(new URI("http://localhost:4566")))
config.toResource.flatMap { case (region, credentials, endpoint) =>
SQSClient[IO](
region,
credentials,
endpoint = endpoint
)
}

}
9 changes: 9 additions & 0 deletions azure/service-bus/integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# How to run tests

Tests are using [AzureCliCredential](https://learn.microsoft.com/en-us/java/api/com.azure.identity.azureclicredential?view=azure-java-stable).
Make sure to be assigned with the [Azure Service Bus Data Owner](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/integration#azure-service-bus-data-owner) role.

Steps:
- `az login`
- `export AZURE_SERVICEBUS_HOSTNAME=<your-servicebus-hostname>`
- `sbt "project azureServiceBusIt" test`
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue.servicebus

import cats.effect.{IO, Resource}
import com.azure.identity.AzureCliCredentialBuilder
import com.commercetools.queue.QueueClient
import com.commercetools.queue.azure.servicebus.ServiceBusClient
import com.commercetools.queue.testkit.QueueClientSuite

class ServiceBusClientSuite extends QueueClientSuite {

private def config = string("AZURE_SERVICEBUS_HOSTNAME")
override val inFlightMessagesStatsSupported: Boolean = false // not supported

override def client: Resource[IO, QueueClient[IO]] =
config.toResource.flatMap { namespace =>
ServiceBusClient[IO](
namespace = namespace,
credentials = new AzureCliCredentialBuilder().build()
)
}
}
34 changes: 29 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ val commonSettings = List(
libraryDependencies ++= Seq(
"org.scalameta" %%% "munit" % Versions.munit % Test,
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.5.3" % Test
"org.typelevel" %%% "cats-effect-testkit" % "3.5.4" % Test
),
scalacOptions += (scalaVersion.value match {
case Scala213 => "-Wunused"
Expand Down Expand Up @@ -84,7 +84,8 @@ lazy val testkit = crossProject(JVMPlatform)
name := "fs2-queues-testkit",
libraryDependencies ++= List(
"org.scalameta" %%% "munit" % Versions.munit,
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect
"org.typelevel" %%% "munit-cats-effect-3" % Versions.munitCatsEffect,
"org.slf4j" % "slf4j-simple" % "2.0.16"
)
)
.dependsOn(core)
Expand Down Expand Up @@ -137,19 +138,30 @@ lazy val azureServiceBus = crossProject(JVMPlatform)
.settings(
name := "fs2-queues-azure-service-bus",
libraryDependencies ++= List(
"com.azure" % "azure-messaging-servicebus" % "7.17.0"
"com.azure" % "azure-messaging-servicebus" % "7.17.3"
)
)
.dependsOn(core, testkit % Test)

lazy val azureServiceBusIt = project
.in(file("azure/service-bus/integration"))
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.settings(
libraryDependencies ++= List(
"com.azure" % "azure-identity" % "1.11.1"
)
)
.dependsOn(azureServiceBus.jvm % Test, testkit.jvm % Test)

lazy val awsSQS = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
.in(file("aws/sqs"))
.settings(commonSettings)
.settings(
name := "fs2-queues-aws-sqs",
libraryDependencies ++= List(
"software.amazon.awssdk" % "sqs" % "2.25.50"
"software.amazon.awssdk" % "sqs" % "2.25.70"
)
)
.dependsOn(core)
Expand All @@ -166,8 +178,20 @@ lazy val gcpPubSub = crossProject(JVMPlatform)
.settings(commonSettings)
.settings(
name := "fs2-queues-gcp-pubsub",
// TODO: Remove once next version is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.gcp.pubsub.PubSubAdministration.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged$default$5"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPublisher.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPuller.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubSubscriber.this")
),
libraryDependencies ++= List(
"com.google.cloud" % "google-cloud-pubsub" % "1.129.3",
"com.google.cloud" % "google-cloud-pubsub" % "1.129.7",
"com.google.cloud" % "google-cloud-monitoring" % "3.50.0"
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab
def queueName: String

/**
* Returns a way to bush messages into the queue.
* Returns a way to push messages into the queue.
* This is a low-level construct, mainly aiming at integrating existing
* code bases that require to push explicitly.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,27 @@ import com.google.api.gax.core.NoCredentialsProvider

class PubSubClientSuite extends QueueClientSuite {

override val queueUpdateSupported = false
private def isEmulatorDefault = true
private def isEmulatorEnvVar = "GCP_PUBSUB_USE_EMULATOR"

override val queueUpdateSupported: Boolean = false // not supported
override val inFlightMessagesStatsSupported: Boolean = false // not supported
override val delayedMessagesStatsSupported: Boolean = false // not supported
override val messagesStatsSupported: Boolean = // // not supported in the emulator
!sys.env.get(isEmulatorEnvVar).map(_.toBoolean).getOrElse(isEmulatorDefault)

private def config =
booleanOrDefault(isEmulatorEnvVar, default = isEmulatorDefault).ifM(
ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"))),
ifFalse = for {
project <- string("GCP_PUBSUB_PROJECT")
credentials = NoCredentialsProvider.create() // TODO
} yield (project, credentials, None)
)

override def client: Resource[IO, QueueClient[IO]] =
PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("http://localhost:8042"))
config.toResource.flatMap { case (project, credentials, endpoint) =>
PubSubClient(project, credentials, endpoint = endpoint)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, Expi
import scala.concurrent.duration._

private class PubSubAdministration[F[_]](
useGrpc: Boolean,
project: String,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand All @@ -38,26 +37,20 @@ private class PubSubAdministration[F[_]](

private val adminClient = Resource.fromAutoCloseable(F.delay {
val builder =
if (useGrpc)
TopicAdminSettings.newBuilder()
else
TopicAdminSettings.newHttpJsonBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
TopicAdminSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
TopicAdminClient.create(builder.build())
})

private val subscriptionClient = Resource.fromAutoCloseable(F.delay {
val builder =
if (useGrpc)
SubscriptionAdminSettings.newBuilder()
else
SubscriptionAdminSettings.newHttpJsonBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
SubscriptionAdminSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
SubscriptionAdminClient.create(builder.build())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,32 @@
package com.commercetools.queue.gcp.pubsub

import cats.effect.{Async, Resource}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient}
import com.commercetools.queue._
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel}
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider}
import com.google.pubsub.v1.{SubscriptionName, TopicName}
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder

private class PubSubClient[F[_]: Async] private (
project: String,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
credentials: CredentialsProvider,
endpoint: Option[String])
extends UnsealedQueueClient[F] {

override def administration: QueueAdministration[F] =
new PubSubAdministration[F](useGrpc, project, channelProvider, credentials, endpoint)
new PubSubAdministration[F](project, channelProvider, credentials, endpoint)

override def statistics(name: String): QueueStatistics[F] =
new PubSubStatistics(name, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, endpoint)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new PubSubPublisher[F, T](name, useGrpc, TopicName.of(project, name), channelProvider, credentials, endpoint)
new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new PubSubSubscriber[F, T](
name,
useGrpc,
SubscriptionName.of(project, s"fs2-queue-$name"),
channelProvider,
credentials,
Expand All @@ -53,31 +52,34 @@ private class PubSubClient[F[_]: Async] private (

object PubSubClient {

private def makeDefaultTransportChannel(endpoint: Option[String]): HttpJsonTransportChannel =
HttpJsonTransportChannel.create(
ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build())
private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel =
GrpcTransportChannel.create(
NettyChannelBuilder
.forTarget(endpoint.getOrElse("https://pubsub.googleapis.com"))
.usePlaintext()
.build()
)

def apply[F[_]](
project: String,
credentials: CredentialsProvider,
endpoint: Option[String] = None,
mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _
mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel
)(implicit F: Async[F]
): Resource[F, QueueClient[F]] =
Resource
.fromAutoCloseable(F.blocking(mkTransportChannel(endpoint)))
.map { channel =>
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint)
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint)
}

def unmanaged[F[_]](
project: String,
credentials: CredentialsProvider,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
endpoint: Option[String] = None
)(implicit F: Async[F]
): QueueClient[F] =
new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint)
new PubSubClient[F](project, channelProvider, credentials, endpoint)

}
Loading

0 comments on commit 6728d07

Please sign in to comment.