Skip to content

Commit

Permalink
Only support grpc for gcp pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed Sep 20, 2024
1 parent 43c3e4b commit cd86aa0
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 66 deletions.
12 changes: 12 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ lazy val gcpPubSub = crossProject(JVMPlatform)
.settings(commonSettings)
.settings(
name := "fs2-queues-gcp-pubsub",
// TODO: Remove once next version is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.gcp.pubsub.PubSubAdministration.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubClient.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.commercetools.queue.gcp.pubsub.PubSubClient.unmanaged$default$5"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPublisher.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubPuller.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.gcp.pubsub.PubSubSubscriber.this")
),
libraryDependencies ++= List(
"com.google.cloud" % "google-cloud-pubsub" % "1.129.3",
"com.google.cloud" % "google-cloud-monitoring" % "3.47.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ class PubSubClientSuite extends QueueClientSuite {
override val queueUpdateSupported = false

override def client: Resource[IO, QueueClient[IO]] =
PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("http://localhost:8042"))
PubSubClient("test-project", NoCredentialsProvider.create(), endpoint = Some("localhost:8042"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, Expi
import scala.concurrent.duration._

private class PubSubAdministration[F[_]](
useGrpc: Boolean,
project: String,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand All @@ -38,26 +37,20 @@ private class PubSubAdministration[F[_]](

private val adminClient = Resource.fromAutoCloseable(F.delay {
val builder =
if (useGrpc)
TopicAdminSettings.newBuilder()
else
TopicAdminSettings.newHttpJsonBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
TopicAdminSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
TopicAdminClient.create(builder.build())
})

private val subscriptionClient = Resource.fromAutoCloseable(F.delay {
val builder =
if (useGrpc)
SubscriptionAdminSettings.newBuilder()
else
SubscriptionAdminSettings.newHttpJsonBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
SubscriptionAdminSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
SubscriptionAdminClient.create(builder.build())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,32 @@
package com.commercetools.queue.gcp.pubsub

import cats.effect.{Async, Resource}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient}
import com.commercetools.queue._
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel}
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider}
import com.google.pubsub.v1.{SubscriptionName, TopicName}
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder

private class PubSubClient[F[_]: Async] private (
project: String,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
credentials: CredentialsProvider,
endpoint: Option[String])
extends UnsealedQueueClient[F] {

override def administration: QueueAdministration[F] =
new PubSubAdministration[F](useGrpc, project, channelProvider, credentials, endpoint)
new PubSubAdministration[F](project, channelProvider, credentials, endpoint)

override def statistics(name: String): QueueStatistics[F] =
new PubSubStatistics(name, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, endpoint)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new PubSubPublisher[F, T](name, useGrpc, TopicName.of(project, name), channelProvider, credentials, endpoint)
new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new PubSubSubscriber[F, T](
name,
useGrpc,
SubscriptionName.of(project, s"fs2-queue-$name"),
channelProvider,
credentials,
Expand All @@ -53,31 +52,34 @@ private class PubSubClient[F[_]: Async] private (

object PubSubClient {

private def makeDefaultTransportChannel(endpoint: Option[String]): HttpJsonTransportChannel =
HttpJsonTransportChannel.create(
ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build())
private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel =
GrpcTransportChannel.create(
NettyChannelBuilder
.forTarget(endpoint.getOrElse("https://pubsub.googleapis.com"))
.usePlaintext()
.build()
)

def apply[F[_]](
project: String,
credentials: CredentialsProvider,
endpoint: Option[String] = None,
mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _
mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel
)(implicit F: Async[F]
): Resource[F, QueueClient[F]] =
Resource
.fromAutoCloseable(F.blocking(mkTransportChannel(endpoint)))
.map { channel =>
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint)
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint)
}

def unmanaged[F[_]](
project: String,
credentials: CredentialsProvider,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
endpoint: Option[String] = None
)(implicit F: Async[F]
): QueueClient[F] =
new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint)
new PubSubClient[F](project, channelProvider, credentials, endpoint)

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import cats.effect.{Async, Resource}
import com.commercetools.queue.{QueuePusher, Serializer, UnsealedQueuePublisher}
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.TransportChannelProvider
import com.google.cloud.pubsub.v1.stub.{GrpcPublisherStub, HttpJsonPublisherStub, PublisherStubSettings}
import com.google.cloud.pubsub.v1.stub.{GrpcPublisherStub, PublisherStubSettings}
import com.google.pubsub.v1.TopicName

private class PubSubPublisher[F[_], T](
val queueName: String,
useGrpc: Boolean,
topicName: TopicName,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand All @@ -40,19 +39,12 @@ private class PubSubPublisher[F[_], T](
.fromAutoCloseable {
F.blocking {
val builder =
if (useGrpc)
PublisherStubSettings.newBuilder()
else
PublisherStubSettings.newHttpJsonBuilder()
PublisherStubSettings.newBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
if (useGrpc)
GrpcPublisherStub.create(builder.build())
else
HttpJsonPublisherStub.create(builder.build())

GrpcPublisherStub.create(builder.build())
}
}
.map(new PubSubPusher[F, T](queueName, topicName, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import cats.effect.syntax.concurrent._
import cats.syntax.all._
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import com.google.api.gax.grpc.GrpcCallContext
import com.google.api.gax.httpjson.HttpJsonCallContext
import com.google.api.gax.retrying.RetrySettings
import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
Expand All @@ -35,7 +34,6 @@ import scala.jdk.CollectionConverters._

private class PubSubPuller[F[_], T](
val queueName: String,
useGrpc: Boolean,
subscriptionName: SubscriptionName,
subscriber: SubscriberStub,
lockTTLSeconds: Int
Expand All @@ -45,16 +43,9 @@ private class PubSubPuller[F[_], T](
extends UnsealedQueuePuller[F, T] {

private def callContext(waitingTime: FiniteDuration): ApiCallContext =
if (useGrpc)
GrpcCallContext
.createDefault()
.withRetrySettings(
RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())
else
HttpJsonCallContext
.createDefault()
.withRetrySettings(
RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())
GrpcCallContext
.createDefault()
.withRetrySettings(RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
wrapFuture(F.delay {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import cats.syntax.functor._
import com.commercetools.queue.{Deserializer, QueuePuller, UnsealedQueueSubscriber}
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.TransportChannelProvider
import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, HttpJsonSubscriberStub, SubscriberStubSettings}
import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStubSettings}
import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName}

private class PubSubSubscriber[F[_], T](
val queueName: String,
useGrpc: Boolean,
subscriptionName: SubscriptionName,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand All @@ -41,18 +40,12 @@ private class PubSubSubscriber[F[_], T](
.fromAutoCloseable {
F.blocking {
val builder =
if (useGrpc)
SubscriberStubSettings.newBuilder()
else
SubscriberStubSettings.newHttpJsonBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
SubscriberStubSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
if (useGrpc)
GrpcSubscriberStub.create(builder.build())
else
HttpJsonSubscriberStub.create(builder.build())
GrpcSubscriberStub.create(builder.build())
}
}
.evalMap { subscriber =>
Expand All @@ -63,7 +56,7 @@ private class PubSubSubscriber[F[_], T](
sub => (subscriber, sub))
}
.map { case (subscriber, subscription) =>
new PubSubPuller[F, T](queueName, useGrpc, subscriptionName, subscriber, subscription.getAckDeadlineSeconds())
new PubSubPuller[F, T](queueName, subscriptionName, subscriber, subscription.getAckDeadlineSeconds())
}

}

0 comments on commit cd86aa0

Please sign in to comment.