diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala index b4eac2e..252d586 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubClient.scala @@ -20,12 +20,13 @@ import cats.effect.{Async, Resource} import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer} import com.google.api.gax.core.CredentialsProvider import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel} -import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannel, TransportChannelProvider} +import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider} import com.google.pubsub.v1.{SubscriptionName, TopicName} class PubSubClient[F[_]: Async] private ( project: String, channelProvider: TransportChannelProvider, + useGrpc: Boolean, credentials: CredentialsProvider, endpoint: Option[String]) extends QueueClient[F] { @@ -39,6 +40,7 @@ class PubSubClient[F[_]: Async] private ( override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] = new PubSubSubscriber[F, T]( name, + useGrpc, SubscriptionName.of(project, s"fs2-queue-$name"), channelProvider, credentials, @@ -48,7 +50,7 @@ class PubSubClient[F[_]: Async] private ( object PubSubClient { - private def makeDefaultTransportChannel(endpoint: Option[String]): TransportChannel = + private def makeDefaultTransportChannel(endpoint: Option[String]): HttpJsonTransportChannel = HttpJsonTransportChannel.create( ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build()) @@ -56,13 +58,23 @@ object PubSubClient { project: String, credentials: CredentialsProvider, endpoint: Option[String] = None, - mkTransportChannel: Option[String] => TransportChannel = makeDefaultTransportChannel _ + mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _ )(implicit F: Async[F] ): Resource[F, PubSubClient[F]] = Resource .fromAutoCloseable(F.blocking(mkTransportChannel(endpoint))) .map { channel => - new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint) + new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint) } + def unmanaged[F[_]]( + project: String, + credentials: CredentialsProvider, + channelProvider: TransportChannelProvider, + useGrpc: Boolean, + endpoint: Option[String] = None + )(implicit F: Async[F] + ): PubSubClient[F] = + new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint) + } diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala index f16c899..c4b035c 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubPuller.scala @@ -20,9 +20,10 @@ import cats.effect.Async import cats.effect.syntax.concurrent._ import cats.syntax.all._ import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller} +import com.google.api.gax.grpc.GrpcCallContext import com.google.api.gax.httpjson.HttpJsonCallContext import com.google.api.gax.retrying.RetrySettings -import com.google.api.gax.rpc.DeadlineExceededException +import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException} import com.google.cloud.pubsub.v1.stub.SubscriberStub import com.google.pubsub.v1.{ModifyAckDeadlineRequest, PullRequest, ReceivedMessage, SubscriptionName} import fs2.Chunk @@ -34,6 +35,7 @@ import scala.jdk.CollectionConverters._ class PubSubPuller[F[_], T]( val queueName: String, + useGrpc: Boolean, subscriptionName: SubscriptionName, subscriber: SubscriberStub, lockTTLSeconds: Int @@ -42,15 +44,23 @@ class PubSubPuller[F[_], T]( deserializer: Deserializer[T]) extends QueuePuller[F, T] { + private def callContext(waitingTime: FiniteDuration): ApiCallContext = + if (useGrpc) + GrpcCallContext + .createDefault() + .withRetrySettings( + RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) + else + HttpJsonCallContext + .createDefault() + .withRetrySettings( + RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()) + override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] = wrapFuture(F.delay { subscriber .pullCallable() - .withDefaultCallContext( - HttpJsonCallContext - .createDefault() - .withRetrySettings( - RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())) + .withDefaultCallContext(callContext(waitingTime)) .futureCall( PullRequest.newBuilder().setMaxMessages(batchSize).setSubscription(subscriptionName.toString()).build()) }).map(response => Chunk.from(response.getReceivedMessagesList().asScala)) @@ -59,6 +69,16 @@ class PubSubPuller[F[_], T]( Chunk.empty } .flatMap { (msgs: Chunk[ReceivedMessage]) => + // PubSub does not support delayed messages + // Instead in this case, we set a custom attribute when publishing + // with a delay. The attribute contains the earliest delivery date + // according to the delay. + // If the `Puller` gets a message with this attribute, we check wether + // it is in the future. + // If it is the case, the ack deadline is modified to be the amount of + // remaining seconds until this date. + // This way, the message will not be delivered until this expires. + // The message is ignored (filtered out of the batch) msgs.traverseFilter[F, ReceivedMessage] { msg => val attrs = msg.getMessage().getAttributesMap().asScala F.realTimeInstant.flatMap { now => diff --git a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala index 6e665c0..e33842c 100644 --- a/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala +++ b/gcp/pubsub/src/main/scala/com/commercetools/queue/gcp/pubsub/PubSubSubscriber.scala @@ -26,6 +26,7 @@ import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName} class PubSubSubscriber[F[_], T]( val queueName: String, + useGrpc: Boolean, subscriptionName: SubscriptionName, channelProvider: TransportChannelProvider, credentials: CredentialsProvider, @@ -56,7 +57,7 @@ class PubSubSubscriber[F[_], T]( sub => (subscriber, sub)) } .map { case (subscriber, subscription) => - new PubSubPuller[F, T](queueName, subscriptionName, subscriber, subscription.getAckDeadlineSeconds()) + new PubSubPuller[F, T](queueName, useGrpc, subscriptionName, subscriber, subscription.getAckDeadlineSeconds()) } }