Skip to content

Commit

Permalink
Only grpc for pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed Sep 20, 2024
1 parent f2efc4e commit 378a548
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.commercetools.queue.testkit.QueueClientSuite
class ServiceBusClientSuite extends QueueClientSuite {

private def config = string("AZURE_SERVICEBUS_HOSTNAME")
override val inFlightMessagesStatsSupported: Boolean = false
override val inFlightMessagesStatsSupported: Boolean = false // not supported

override def client: Resource[IO, QueueClient[IO]] =
config.toResource.flatMap { namespace =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ import com.google.api.gax.core.NoCredentialsProvider

class PubSubClientSuite extends QueueClientSuite {

override val queueUpdateSupported = false
override val inFlightMessagesStatsSupported: Boolean = false
override val delayedMessagesStatsSupported: Boolean = false
private def isEmulatorDefault = true
private def isEmulatorEnvVar = "GCP_PUBSUB_USE_EMULATOR"

override val queueUpdateSupported: Boolean = false // not supported
override val inFlightMessagesStatsSupported: Boolean = false // not supported
override val delayedMessagesStatsSupported: Boolean = false // not supported
override val messagesStatsSupported: Boolean = // // not supported in the emulator
!sys.env.get(isEmulatorEnvVar).map(_.toBoolean).getOrElse(isEmulatorDefault)

private def config =
booleanOrDefault("GCP_PUBSUB_USE_EMULATOR", default = true).ifM(
ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("http://localhost:8042"))),
booleanOrDefault(isEmulatorEnvVar, default = isEmulatorDefault).ifM(
ifTrue = IO.pure(("test-project", NoCredentialsProvider.create(), Some("localhost:8042"))),
ifFalse = for {
project <- string("GCP_PUBSUB_PROJECT")
credentials = NoCredentialsProvider.create() // TODO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import com.google.pubsub.v1.{DeleteSubscriptionRequest, DeleteTopicRequest, Expi
import scala.concurrent.duration._

private class PubSubAdministration[F[_]](
useGrpc: Boolean,
project: String,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand All @@ -38,27 +37,21 @@ private class PubSubAdministration[F[_]](

private val adminClient = Resource.fromAutoCloseable(F.delay {
val builder =
if (useGrpc)
TopicAdminSettings.newBuilder()
else
TopicAdminSettings.newHttpJsonBuilder()
TopicAdminSettings.newBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
endpoint.foreach(builder.setEndpoint)
TopicAdminClient.create(builder.build())
})

private val subscriptionClient = Resource.fromAutoCloseable(F.delay {
val builder =
if (useGrpc)
SubscriptionAdminSettings.newBuilder()
else
SubscriptionAdminSettings.newHttpJsonBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
SubscriptionAdminSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint)
SubscriptionAdminClient.create(builder.build())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,32 @@
package com.commercetools.queue.gcp.pubsub

import cats.effect.{Async, Resource}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueStatistics, QueueSubscriber, Serializer, UnsealedQueueClient}
import com.commercetools.queue._
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel}
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider}
import com.google.pubsub.v1.{SubscriptionName, TopicName}
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder

private class PubSubClient[F[_]: Async] private (
project: String,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
credentials: CredentialsProvider,
endpoint: Option[String])
extends UnsealedQueueClient[F] {

override def administration: QueueAdministration[F] =
new PubSubAdministration[F](useGrpc, project, channelProvider, credentials, endpoint)
new PubSubAdministration[F](project, channelProvider, credentials, endpoint)

override def statistics(name: String): QueueStatistics[F] =
new PubSubStatistics(name, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, endpoint)

override def publish[T: Serializer](name: String): QueuePublisher[F, T] =
new PubSubPublisher[F, T](name, useGrpc, TopicName.of(project, name), channelProvider, credentials, endpoint)
new PubSubPublisher[F, T](name, TopicName.of(project, name), channelProvider, credentials, endpoint)

override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new PubSubSubscriber[F, T](
name,
useGrpc,
SubscriptionName.of(project, s"fs2-queue-$name"),
channelProvider,
credentials,
Expand All @@ -53,31 +52,34 @@ private class PubSubClient[F[_]: Async] private (

object PubSubClient {

private def makeDefaultTransportChannel(endpoint: Option[String]): HttpJsonTransportChannel =
HttpJsonTransportChannel.create(
ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build())
private def makeDefaultTransportChannel(endpoint: Option[String]): GrpcTransportChannel =
GrpcTransportChannel.create(
NettyChannelBuilder
.forTarget(endpoint.getOrElse("https://pubsub.googleapis.com"))
.usePlaintext()
.build()
)

def apply[F[_]](
project: String,
credentials: CredentialsProvider,
endpoint: Option[String] = None,
mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _
mkTransportChannel: Option[String] => GrpcTransportChannel = makeDefaultTransportChannel
)(implicit F: Async[F]
): Resource[F, QueueClient[F]] =
Resource
.fromAutoCloseable(F.blocking(mkTransportChannel(endpoint)))
.map { channel =>
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint)
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint)
}

def unmanaged[F[_]](
project: String,
credentials: CredentialsProvider,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
endpoint: Option[String] = None
)(implicit F: Async[F]
): QueueClient[F] =
new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint)
new PubSubClient[F](project, channelProvider, credentials, endpoint)

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import cats.effect.{Async, Resource}
import com.commercetools.queue.{QueuePusher, Serializer, UnsealedQueuePublisher}
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.TransportChannelProvider
import com.google.cloud.pubsub.v1.stub.{GrpcPublisherStub, HttpJsonPublisherStub, PublisherStubSettings}
import com.google.cloud.pubsub.v1.stub.{GrpcPublisherStub, PublisherStubSettings}
import com.google.pubsub.v1.TopicName

private class PubSubPublisher[F[_], T](
val queueName: String,
useGrpc: Boolean,
topicName: TopicName,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand All @@ -40,19 +39,12 @@ private class PubSubPublisher[F[_], T](
.fromAutoCloseable {
F.blocking {
val builder =
if (useGrpc)
PublisherStubSettings.newBuilder()
else
PublisherStubSettings.newHttpJsonBuilder()
builder
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
if (useGrpc)
GrpcPublisherStub.create(builder.build())
else
HttpJsonPublisherStub.create(builder.build())

PublisherStubSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint)
GrpcPublisherStub.create(builder.build())
}
}
.map(new PubSubPusher[F, T](queueName, topicName, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import cats.effect.syntax.concurrent._
import cats.syntax.all._
import com.commercetools.queue.{Deserializer, MessageContext, UnsealedQueuePuller}
import com.google.api.gax.grpc.GrpcCallContext
import com.google.api.gax.httpjson.HttpJsonCallContext
import com.google.api.gax.retrying.RetrySettings
import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
Expand All @@ -35,7 +34,6 @@ import scala.jdk.CollectionConverters._

private class PubSubPuller[F[_], T](
val queueName: String,
useGrpc: Boolean,
subscriptionName: SubscriptionName,
subscriber: SubscriberStub,
lockTTLSeconds: Int
Expand All @@ -45,16 +43,9 @@ private class PubSubPuller[F[_], T](
extends UnsealedQueuePuller[F, T] {

private def callContext(waitingTime: FiniteDuration): ApiCallContext =
if (useGrpc)
GrpcCallContext
.createDefault()
.withRetrySettings(
RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())
else
HttpJsonCallContext
.createDefault()
.withRetrySettings(
RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())
GrpcCallContext
.createDefault()
.withRetrySettings(RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
wrapFuture(F.delay {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private class PubSubStatistics[F[_]](
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
endpoint.foreach(builder.setEndpoint)
GrpcMetricServiceStub.create(builder.build())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import cats.syntax.functor._
import com.commercetools.queue.{Deserializer, QueuePuller, UnsealedQueueSubscriber}
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.rpc.TransportChannelProvider
import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, HttpJsonSubscriberStub, SubscriberStubSettings}
import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStubSettings}
import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName}

private class PubSubSubscriber[F[_], T](
val queueName: String,
useGrpc: Boolean,
subscriptionName: SubscriptionName,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand All @@ -40,19 +39,12 @@ private class PubSubSubscriber[F[_], T](
Resource
.fromAutoCloseable {
F.blocking {
val builder =
if (useGrpc)
SubscriberStubSettings.newBuilder()
else
SubscriberStubSettings.newHttpJsonBuilder()
builder
val builder = SubscriberStubSettings
.newBuilder()
.setCredentialsProvider(credentials)
.setTransportChannelProvider(channelProvider)
endpoint.foreach(builder.setEndpoint(_))
if (useGrpc)
GrpcSubscriberStub.create(builder.build())
else
HttpJsonSubscriberStub.create(builder.build())
endpoint.foreach(builder.setEndpoint)
GrpcSubscriberStub.create(builder.build())
}
}
.evalMap { subscriber =>
Expand All @@ -63,7 +55,7 @@ private class PubSubSubscriber[F[_], T](
sub => (subscriber, sub))
}
.map { case (subscriber, subscription) =>
new PubSubPuller[F, T](queueName, useGrpc, subscriptionName, subscriber, subscription.getAckDeadlineSeconds())
new PubSubPuller[F, T](queueName, subscriptionName, subscriber, subscription.getAckDeadlineSeconds())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ abstract class QueueClientSuite

/** Override these if the given provider is not supporting these features */
val queueUpdateSupported: Boolean = true
val messagesStatsSupported: Boolean = true
val inFlightMessagesStatsSupported: Boolean = true
val delayedMessagesStatsSupported: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import scala.concurrent.duration.DurationInt
trait QueuePublisherSuite extends CatsEffectSuite { self: QueueClientSuite =>

withQueue.test("sink publishes all the messages") { queueName =>
assume(messagesStatsSupported)
val client = clientFixture()
for {
msgs <- randomMessages(30)
Expand All @@ -27,7 +28,7 @@ trait QueuePublisherSuite extends CatsEffectSuite { self: QueueClientSuite =>
}

withQueue.test("sink publishes all the messages with a delay") { queueName =>
assume(delayedMessagesStatsSupported, "The test environment does not support delayed messages stats")
assume(messagesStatsSupported && delayedMessagesStatsSupported)
val client = clientFixture()
for {
msgs <- randomMessages(30)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import scala.concurrent.duration.DurationInt
trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite =>

withQueue.test("stats should report queued messages") { queueName =>
assume(messagesStatsSupported)
for {
messages <- randomMessages(30)
client = clientFixture()
Expand All @@ -39,7 +40,7 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite =>
}

withQueue.test("stats should report inflight messages") { queueName =>
assume(inFlightMessagesStatsSupported, "The test environment does not support in-flight messages stats")
assume(messagesStatsSupported && inFlightMessagesStatsSupported)
for {
messages <- randomMessages(30)
client = clientFixture()
Expand All @@ -59,7 +60,7 @@ trait QueueStatisticsSuite extends CatsEffectSuite { self: QueueClientSuite =>
}

withQueue.test("stats should report delayed messages") { queueName =>
assume(delayedMessagesStatsSupported, "The test environment does not support delayed messages stats")
assume(messagesStatsSupported && delayedMessagesStatsSupported)
for {
messages <- randomMessages(30)
client = clientFixture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite =>
else true
}
}.void)
remaining <- client.statistics(queueName).fetcher.use(_.fetch).map(_.messages)
_ = assertEquals(remaining, 0, "not all the messages got acked")
_ <-
if (messagesStatsSupported)
assertIO(
client.statistics(queueName).fetcher.use(_.fetch).map(_.messages),
0,
"not all the messages got acked")
else IO.unit
} yield ()
}

Expand All @@ -137,9 +142,17 @@ trait QueueSubscriberSuite extends CatsEffectSuite { self: QueueClientSuite =>
toBeNacked <- toBeNackedRef.get
_ = assertEquals(toBeAcked, Set("0", "2", "4", "6", "8"))
_ = assertEquals(toBeNacked, Set("1", "3", "5", "7", "9"))
stats <- client.statistics(queueName).fetcher.use(_.fetch)
// it may take a while to move the inflight messages back
_ = assert(stats.messages + stats.inflight.getOrElse(0) == 5, "not all the expected messages got nacked")
_ <-
if (messagesStatsSupported)
assertIOBoolean(
client
.statistics(queueName)
.fetcher
.use(_.fetch)
.map(stats => stats.messages + stats.inflight.getOrElse(0) == 5),
"not all the expected messages got nacked"
)
else IO.unit
} yield ()
}

Expand Down

0 comments on commit 378a548

Please sign in to comment.