Skip to content

Commit

Permalink
Allow for unmanaged channel provider
Browse files Browse the repository at this point in the history
The custom channel provider might be HTTP or GRPC based, it is up to the
user to indicate which is used.
  • Loading branch information
satabin committed May 23, 2024
1 parent 778f8ec commit fb2489a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import cats.effect.{Async, Resource}
import com.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}
import com.google.api.gax.core.CredentialsProvider
import com.google.api.gax.httpjson.{HttpJsonTransportChannel, ManagedHttpJsonChannel}
import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannel, TransportChannelProvider}
import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider}
import com.google.pubsub.v1.{SubscriptionName, TopicName}

class PubSubClient[F[_]: Async] private (
project: String,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
credentials: CredentialsProvider,
endpoint: Option[String])
extends QueueClient[F] {
Expand All @@ -39,6 +40,7 @@ class PubSubClient[F[_]: Async] private (
override def subscribe[T: Deserializer](name: String): QueueSubscriber[F, T] =
new PubSubSubscriber[F, T](
name,
useGrpc,
SubscriptionName.of(project, s"fs2-queue-$name"),
channelProvider,
credentials,
Expand All @@ -48,21 +50,31 @@ class PubSubClient[F[_]: Async] private (

object PubSubClient {

private def makeDefaultTransportChannel(endpoint: Option[String]): TransportChannel =
private def makeDefaultTransportChannel(endpoint: Option[String]): HttpJsonTransportChannel =
HttpJsonTransportChannel.create(
ManagedHttpJsonChannel.newBuilder().setEndpoint(endpoint.getOrElse("https://pubsub.googleapis.com")).build())

def apply[F[_]](
project: String,
credentials: CredentialsProvider,
endpoint: Option[String] = None,
mkTransportChannel: Option[String] => TransportChannel = makeDefaultTransportChannel _
mkTransportChannel: Option[String] => HttpJsonTransportChannel = makeDefaultTransportChannel _
)(implicit F: Async[F]
): Resource[F, PubSubClient[F]] =
Resource
.fromAutoCloseable(F.blocking(mkTransportChannel(endpoint)))
.map { channel =>
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), credentials, endpoint)
new PubSubClient[F](project, FixedTransportChannelProvider.create(channel), false, credentials, endpoint)
}

def unmanaged[F[_]](
project: String,
credentials: CredentialsProvider,
channelProvider: TransportChannelProvider,
useGrpc: Boolean,
endpoint: Option[String] = None
)(implicit F: Async[F]
): PubSubClient[F] =
new PubSubClient[F](project, channelProvider, useGrpc, credentials, endpoint)

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import cats.effect.Async
import cats.effect.syntax.concurrent._
import cats.syntax.all._
import com.commercetools.queue.{Deserializer, MessageContext, QueuePuller}
import com.google.api.gax.grpc.GrpcCallContext
import com.google.api.gax.httpjson.HttpJsonCallContext
import com.google.api.gax.retrying.RetrySettings
import com.google.api.gax.rpc.DeadlineExceededException
import com.google.api.gax.rpc.{ApiCallContext, DeadlineExceededException}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import com.google.pubsub.v1.{ModifyAckDeadlineRequest, PullRequest, ReceivedMessage, SubscriptionName}
import fs2.Chunk
Expand All @@ -34,6 +35,7 @@ import scala.jdk.CollectionConverters._

class PubSubPuller[F[_], T](
val queueName: String,
useGrpc: Boolean,
subscriptionName: SubscriptionName,
subscriber: SubscriberStub,
lockTTLSeconds: Int
Expand All @@ -42,15 +44,23 @@ class PubSubPuller[F[_], T](
deserializer: Deserializer[T])
extends QueuePuller[F, T] {

private def callContext(waitingTime: FiniteDuration): ApiCallContext =
if (useGrpc)
GrpcCallContext
.createDefault()
.withRetrySettings(
RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())
else
HttpJsonCallContext
.createDefault()
.withRetrySettings(
RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build())

override def pullBatch(batchSize: Int, waitingTime: FiniteDuration): F[Chunk[MessageContext[F, T]]] =
wrapFuture(F.delay {
subscriber
.pullCallable()
.withDefaultCallContext(
HttpJsonCallContext
.createDefault()
.withRetrySettings(
RetrySettings.newBuilder().setLogicalTimeout(Duration.ofMillis(waitingTime.toMillis)).build()))
.withDefaultCallContext(callContext(waitingTime))
.futureCall(
PullRequest.newBuilder().setMaxMessages(batchSize).setSubscription(subscriptionName.toString()).build())
}).map(response => Chunk.from(response.getReceivedMessagesList().asScala))
Expand All @@ -59,6 +69,16 @@ class PubSubPuller[F[_], T](
Chunk.empty
}
.flatMap { (msgs: Chunk[ReceivedMessage]) =>
// PubSub does not support delayed messages
// Instead in this case, we set a custom attribute when publishing
// with a delay. The attribute contains the earliest delivery date
// according to the delay.
// If the `Puller` gets a message with this attribute, we check wether
// it is in the future.
// If it is the case, the ack deadline is modified to be the amount of
// remaining seconds until this date.
// This way, the message will not be delivered until this expires.
// The message is ignored (filtered out of the batch)
msgs.traverseFilter[F, ReceivedMessage] { msg =>
val attrs = msg.getMessage().getAttributesMap().asScala
F.realTimeInstant.flatMap { now =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.google.pubsub.v1.{GetSubscriptionRequest, SubscriptionName}

class PubSubSubscriber[F[_], T](
val queueName: String,
useGrpc: Boolean,
subscriptionName: SubscriptionName,
channelProvider: TransportChannelProvider,
credentials: CredentialsProvider,
Expand Down Expand Up @@ -56,7 +57,7 @@ class PubSubSubscriber[F[_], T](
sub => (subscriber, sub))
}
.map { case (subscriber, subscription) =>
new PubSubPuller[F, T](queueName, subscriptionName, subscriber, subscription.getAckDeadlineSeconds())
new PubSubPuller[F, T](queueName, useGrpc, subscriptionName, subscriber, subscription.getAckDeadlineSeconds())
}

}

0 comments on commit fb2489a

Please sign in to comment.