diff --git a/build.sbt b/build.sbt index 1570abc..4fcd7cf 100644 --- a/build.sbt +++ b/build.sbt @@ -166,6 +166,18 @@ lazy val gcpPubSub = crossProject(JVMPlatform) .settings(commonSettings) .settings( name := "fs2-queues-gcp-pubsub", + // TODO: Remove once next version is published + mimaBinaryIssueFilters ++= List( + ProblemFilters.exclude[DirectMissingMethodProblem]( + "com.commercetools.queue.gcp.pubsub.PubSubAdministration.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged$default$5"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPublisher.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPuller.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubSubscriber.this") + ), libraryDependencies ++= List( "com.google.cloud" % "google-cloud-pubsub" % "1.129.3", "com.google.cloud" % "google-cloud-monitoring" % "3.47.0" diff --git a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala index 2df944d..6fbff4a 100644 --- a/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala +++ b/gcp/pubsub/integration/src/test/scala/com/commercetools/queue/pubsub/PubSubClientSuite.scala @@ -27,6 +27,6 @@ class PubSubClientSuite extends QueueClientSuite { override val queueUpdateSupported = false override def client: Resource[IO, QueueClient[IO]] = - PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("http://localhost:8042")) + PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("localhost:8042")) } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala index 6897604..f9347a2 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubAdministration.scala @@ -28,7 +28,6 @@ import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, Expi import scala.concurrent.duration._ private class PubSubAdministration[F[_]]( - useGrpc: Boolean, project: String, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, @@ -38,26 +37,20 @@ private class PubSubAdministration[F[_]]( private val adminClient = Resource.fromAutoCloseable(F.delay { val builder = - if (useGrpc) - TopicAdminSettings.newBuilder() - else - TopicAdminSettings.newHttpJsonBuilder() - builder - .setCredentialsProvider(credentials) - .setTransportChannelProvider(channelProvider) + TopicAdminSettings + .newBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) TopicAdminClient.create(builder.build()) }) private val subscriptionClient = Resource.fromAutoCloseable(F.delay { val builder = - if (useGrpc) - SubscriptionAdminSettings.newBuilder() - else - SubscriptionAdminSettings.newHttpJsonBuilder() - builder - .setCredentialsProvider(credentials) - .setTransportChannelProvider(channelProvider) + SubscriptionAdminSettings + .newBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) SubscriptionAdminClient.create(builder.build()) }) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index 3f25127..e268eee 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -17,33 +17,32 @@ package com.commercetools.queue.gcp.pubsub import cats.effect.{Async, Resource} -import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient} +import com.commercetools.queue._ import com.google.api.gax.core.CredentialsProvider -import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel} +import com.google.api.gax.grpc.GrpcTransportChannel import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider} import com.google.pubsub.v1.{SubscriptionName, TopicName} +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder private class PubSubClient[F[_]: Async] private ( project: String, channelProvider: TransportChannelProvider, - useGrpc: Boolean, credentials: CredentialsProvider, endpoint: Option[String]) extends UnsealedQueueClient[F] { override def administration: QueueAdministration[F] = - new PubSubAdministration[F](useGrpc, project, channelProvider, credentials, endpoint) + new PubSubAdministration[F](project, channelProvider, credentials, endpoint) override def statistics(name: String): QueueStatistics[F] = new PubSubStatistics(name, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, endpoint) override def publish[T: Serializer](name: String): QueuePublisher[F, T] = - new PubSubPublisher[F, T](name, useGrpc, TopicName.of(project, name), channelProvider, credentials, endpoint) + new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint) override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = new PubSubSubscriber[F, T]( name, - useGrpc, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, @@ -53,31 +52,34 @@ private class PubSubClient[F[_]: Async] private ( object PubSubClient { - private def makeDefaultTransportChannel(endpoint: Option[String]): HttpJsonTransportChannel = - HttpJsonTransportChannel.create( - ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build()) + private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel = + GrpcTransportChannel.create( + NettyChannelBuilder + .forTarget(endpoint.getOrElse("https://pubsub.googleapis.com")) + .usePlaintext() + .build() + ) def apply[F[_]]( project: String, credentials: CredentialsProvider, endpoint: Option[String] = None, - mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _ + mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel )(implicit F: Async[F] ): Resource[F, QueueClient[F]] = Resource .fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))) .map { channel => - new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint) + new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint) } def unmanaged[F[_]]( project: String, credentials: CredentialsProvider, channelProvider: TransportChannelProvider, - useGrpc: Boolean, endpoint: Option[String] = None )(implicit F: Async[F] ): QueueClient[F] = - new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint) + new PubSubClient[F](project, channelProvider, credentials, endpoint) } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala index 33fd570..5cef2b0 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPublisher.scala @@ -20,12 +20,11 @@ import cats.effect.{Async, Resource} import com.commercetools.queue.{QueuePusher, Serializer, UnsealedQueuePublisher} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.rpc.TransportChannelProvider -import com.google.cloud.pubsub.v1.stub.{GrpcPublisherStub, HttpJsonPublisherStub, PublisherStubSettings} +import com.google.cloud.pubsub.v1.stub.{GrpcPublisherStub, PublisherStubSettings} import com.google.pubsub.v1.TopicName private class PubSubPublisher[F[_], T]( val queueName: String, - useGrpc: Boolean, topicName: TopicName, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, @@ -40,19 +39,12 @@ private class PubSubPublisher[F[_], T]( .fromAutoCloseable { F.blocking { val builder = - if (useGrpc) - PublisherStubSettings.newBuilder() - else - PublisherStubSettings.newHttpJsonBuilder() + PublisherStubSettings.newBuilder() builder .setCredentialsProvider(credentials) .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) - if (useGrpc) - GrpcPublisherStub.create(builder.build()) - else - HttpJsonPublisherStub.create(builder.build()) - + GrpcPublisherStub.create(builder.build()) } } .map(new PubSubPusher[F, T](queueName, topicName, _)) diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala index dfdd603..03b9072 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala @@ -21,7 +21,6 @@ import cats.effect.syntax.concurrent._ import cats.syntax.all._ import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller} import com.google.api.gax.grpc.GrpcCallContext -import com.google.api.gax.httpjson.HttpJsonCallContext import com.google.api.gax.retrying.RetrySettings import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException} import com.google.cloud.pubsub.v1.stub.SubscriberStub @@ -35,7 +34,6 @@ import scala.jdk.CollectionConverters._ private class PubSubPuller[F[_], T]( val queueName: String, - useGrpc: Boolean, subscriptionName: SubscriptionName, subscriber: SubscriberStub, lockTTLSeconds: Int @@ -45,16 +43,9 @@ private class PubSubPuller[F[_], T]( extends UnsealedQueuePuller[F, T] { private def callContext(waitingTime: FiniteDuration): ApiCallContext = - if (useGrpc) - GrpcCallContext - .createDefault() - .withRetrySettings( - RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) - else - HttpJsonCallContext - .createDefault() - .withRetrySettings( - RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) + GrpcCallContext + .createDefault() + .withRetrySettings(RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = wrapFuture(F.delay { diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala index 01b6d11..59c715a 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala @@ -21,12 +21,11 @@ import cats.syntax.functor._ import com.commercetools.queue.{Deserializer, QueuePuller, UnsealedQueueSubscriber} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.rpc.TransportChannelProvider -import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, HttpJsonSubscriberStub, SubscriberStubSettings} +import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStubSettings} import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName} private class PubSubSubscriber[F[_], T]( val queueName: String, - useGrpc: Boolean, subscriptionName: SubscriptionName, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, @@ -41,18 +40,12 @@ private class PubSubSubscriber[F[_], T]( .fromAutoCloseable { F.blocking { val builder = - if (useGrpc) - SubscriberStubSettings.newBuilder() - else - SubscriberStubSettings.newHttpJsonBuilder() - builder - .setCredentialsProvider(credentials) - .setTransportChannelProvider(channelProvider) + SubscriberStubSettings + .newBuilder() + .setCredentialsProvider(credentials) + .setTransportChannelProvider(channelProvider) endpoint.foreach(builder.setEndpoint(_)) - if (useGrpc) - GrpcSubscriberStub.create(builder.build()) - else - HttpJsonSubscriberStub.create(builder.build()) + GrpcSubscriberStub.create(builder.build()) } } .evalMap { subscriber => @@ -63,7 +56,7 @@ private class PubSubSubscriber[F[_], T]( sub => (subscriber, sub)) } .map { case (subscriber, subscription) => - new PubSubPuller[F, T](queueName, useGrpc, subscriptionName, subscriber, subscription.getAckDeadlineSeconds()) + new PubSubPuller[F, T](queueName, subscriptionName, subscriber, subscription.getAckDeadlineSeconds()) } }