Skip to content

Commit

Permalink
Add Azure Service Bus client
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Feb 7, 2024
1 parent c9f089a commit 1066dee
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package de.commercetools.queue.azure.servicebus

import cats.effect.IO
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions
import de.commercetools.queue.QueueAdministration

import java.time.Duration
import scala.concurrent.duration.FiniteDuration

class ServiceBusAdministration(client: ServiceBusAdministrationAsyncClient) extends QueueAdministration {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): IO[Unit] =
fromBlockingMono(
client.createQueue(
name,
new CreateQueueOptions()
.setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(lockTTL.toMillis)))).void

override def delete(name: String): IO[Unit] =
fromBlockingMono(client.deleteQueue(name)).void

override def exists(name: String): IO[Boolean] =
fromBlockingMono(client.getQueueExists(name)).map(_.booleanValue)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package de.commercetools.queue.azure.servicebus

import cats.effect.{IO, Resource}
import com.azure.core.credential.TokenCredential
import com.azure.core.util.ClientOptions
import com.azure.messaging.servicebus.ServiceBusClientBuilder
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder
import de.commercetools.queue.{Deserializer, QueueAdministration, QueueClient, QueuePublisher, QueueSubscriber, Serializer}

class ServiceBusClient private (
clientBuilder: ServiceBusClientBuilder,
adminBuilder: ServiceBusAdministrationClientBuilder)
extends QueueClient {

override def administration: QueueAdministration =
new ServiceBusAdministration(adminBuilder.buildAsyncClient())

override def publisher[T: Serializer](name: String): Resource[IO, QueuePublisher[T]] =
for {
sender <- Resource.make(IO(clientBuilder.sender().queueName(name).buildAsyncClient()))(s => IO(s.close()))
} yield new ServiceBusQueuePublisher[T](sender)

override def subscriber[T: Deserializer](name: String): QueueSubscriber[T] =
new ServiceBusQueueSubscriber[T](name, clientBuilder)

}

object ServiceBusClient {

def apply(namespace: String, credentials: TokenCredential, options: Option[ClientOptions] = None)
: Resource[IO, ServiceBusClient] =
for {
clientBuilder <- Resource.eval {
IO {
val base = new ServiceBusClientBuilder().credential(namespace, credentials)
options.fold(base)(base.clientOptions(_))
}
}
adminBuilder <- Resource.eval {
IO {
val base = new ServiceBusAdministrationClientBuilder().credential(namespace, credentials)
options.fold(base)(base.clientOptions(_))
}
}
} yield new ServiceBusClient(clientBuilder, adminBuilder)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package de.commercetools.queue.azure.servicebus

import cats.effect.IO
import com.azure.messaging.servicebus.{ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient}
import de.commercetools.queue.MessageContext

import java.time.Instant

class ServiceBusMessageContext[T](
val payload: T,
val underlying: ServiceBusReceivedMessage,
receiver: ServiceBusReceiverAsyncClient)
extends MessageContext[T] {

override def enqueuedAt: Instant = underlying.getEnqueuedTime().toInstant()

override def metadata: Map[String, String] =
Map.empty

override def ack(): IO[Unit] =
fromBlockingMono(receiver.complete(underlying)).void

override def nack(): IO[Unit] =
fromBlockingMono(receiver.abandon(underlying)).void

override def extendLock(): IO[Unit] =
fromBlockingMono(receiver.renewMessageLock(underlying)).void

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package de.commercetools.queue.azure.servicebus

import cats.effect.IO
import cats.syntax.all._
import com.azure.messaging.servicebus.{ServiceBusMessage, ServiceBusSenderAsyncClient}
import de.commercetools.queue.{QueuePublisher, Serializer}
import fs2.Pipe

import java.time.ZoneOffset
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

class ServiceBusQueuePublisher[Data](sender: ServiceBusSenderAsyncClient)(implicit serializer: Serializer[Data])
extends QueuePublisher[Data] {

override def publish(message: Data, delay: Option[FiniteDuration]): IO[Unit] = {
val sbMessage = new ServiceBusMessage(serializer.serialize(message))
delay.traverse_(delay =>
IO.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))) *>
fromBlockingMono(sender.sendMessage(sbMessage)).void
}

override def publish(messages: List[Data], delay: Option[FiniteDuration]): IO[Unit] = {
val sbMessages = messages.map(msg => new ServiceBusMessage(serializer.serialize(msg)))
fromBlockingMono(sender.sendMessages(sbMessages.asJava)).void
}

override def sink(chunkSize: Int): Pipe[IO, Data, Nothing] =
_.chunkN(chunkSize).foreach { chunk =>
publish(chunk.toList, None)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package de.commercetools.queue.azure.servicebus

import cats.effect.{IO, Resource}
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode
import com.azure.messaging.servicebus.{ServiceBusClientBuilder, ServiceBusReceivedMessage}
import de.commercetools.queue.{Deserializer, MessageContext, QueueSubscriber}
import fs2.Stream
import fs2.interop.reactivestreams.fromPublisher

import scala.concurrent.duration.FiniteDuration

class ServiceBusQueueSubscriber[Data](
name: String,
builder: ServiceBusClientBuilder
)(implicit deserializer: Deserializer[Data])
extends QueueSubscriber[Data] {

override def messages(batchSize: Int, waitingTime: FiniteDuration): Stream[IO, MessageContext[Data]] =
Stream
.resource(Resource.make {
IO {
builder
.receiver()
.queueName(name)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.buildAsyncClient()
}
} { r =>
IO(r.close())
})
.flatMap { receiver =>
fromPublisher[IO, ServiceBusReceivedMessage](receiver.receiveMessages(), 1)
.groupWithin(batchSize, waitingTime)
.unchunks
.evalMapChunk { sbMessage =>
deserializer.deserialize(sbMessage.getBody().toString()).map { data =>
new ServiceBusMessageContext(data, sbMessage, receiver)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package de.commercetools.queue.azure

import cats.effect.IO
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

package object servicebus {

def fromBlockingMono[T](mono: Mono[T]): IO[T] =
IO.async { cb =>
IO.delay {
mono
.subscribeOn(Schedulers.boundedElastic())
.subscribe(res => cb(Right(res)), t => cb(Left(t)))
}.map(disposable => Some(IO.delay(disposable.dispose())))
}

}

0 comments on commit 1066dee

Please sign in to comment.