diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Binding.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Binding.scala index 12e41791..9b938065 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Binding.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Binding.scala @@ -16,19 +16,19 @@ package dev.profunktor.fs2rabbit.algebra -import cats.effect.Sync +import cats.effect.{Blocker, ContextShift, Sync} import cats.syntax.functor._ import dev.profunktor.fs2rabbit.arguments._ import dev.profunktor.fs2rabbit.model._ object Binding { - def make[F[_]: Sync]: Binding[F] = + def make[F[_]: Sync: ContextShift](blocker: Blocker): Binding[F] = new Binding[F] { override def bindQueue(channel: AMQPChannel, queueName: QueueName, exchangeName: ExchangeName, routingKey: RoutingKey): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueBind( queueName.value, exchangeName.value, @@ -41,7 +41,7 @@ object Binding { exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueBind( queueName.value, exchangeName.value, @@ -55,7 +55,7 @@ object Binding { exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueBindingArgs): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueBindNoWait( queueName.value, exchangeName.value, @@ -83,7 +83,7 @@ object Binding { exchangeName: ExchangeName, routingKey: RoutingKey, args: QueueUnbindArgs): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueUnbind( queueName.value, exchangeName.value, @@ -97,7 +97,7 @@ object Binding { source: ExchangeName, routingKey: RoutingKey, args: ExchangeBindingArgs): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeBind( destination.value, source.value, @@ -111,7 +111,7 @@ object Binding { source: ExchangeName, routingKey: RoutingKey, args: ExchangeBindingArgs): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeBindNoWait( destination.value, source.value, @@ -125,7 +125,7 @@ object Binding { source: ExchangeName, routingKey: RoutingKey, args: ExchangeUnbindArgs): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeUnbind( destination.value, source.value, diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala index 798c8b37..8612f40c 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Consume.scala @@ -17,7 +17,7 @@ package dev.profunktor.fs2rabbit.algebra import cats.effect.syntax.effect._ -import cats.effect.{Effect, Sync} +import cats.effect.{Blocker, ContextShift, Effect, Sync} import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.{Applicative, Functor} @@ -28,7 +28,7 @@ import dev.profunktor.fs2rabbit.model._ import scala.util.{Failure, Success, Try} object Consume { - def make[F[_]: Effect]: Consume[F] = + def make[F[_]: Effect: ContextShift](blocker: Blocker): Consume[F] = new Consume[F] { private[fs2rabbit] def defaultConsumer[A]( channel: AMQPChannel, @@ -108,21 +108,21 @@ object Consume { } } - override def basicAck(channel: AMQPChannel, tag: DeliveryTag, multiple: Boolean): F[Unit] = Sync[F].delay { + override def basicAck(channel: AMQPChannel, tag: DeliveryTag, multiple: Boolean): F[Unit] = blocker.delay { channel.value.basicAck(tag.value, multiple) } override def basicNack(channel: AMQPChannel, tag: DeliveryTag, multiple: Boolean, requeue: Boolean): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.basicNack(tag.value, multiple, requeue) } - override def basicReject(channel: AMQPChannel, tag: DeliveryTag, requeue: Boolean): F[Unit] = Sync[F].delay { + override def basicReject(channel: AMQPChannel, tag: DeliveryTag, requeue: Boolean): F[Unit] = blocker.delay { channel.value.basicReject(tag.value, requeue) } override def basicQos(channel: AMQPChannel, basicQos: BasicQos): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.basicQos( basicQos.prefetchSize, basicQos.prefetchCount, @@ -141,7 +141,7 @@ object Consume { )(internals: AMQPInternals[F]): F[ConsumerTag] = for { dc <- defaultConsumer(channel, internals) - rs <- Sync[F].delay( + rs <- blocker.delay( channel.value.basicConsume( queueName.value, autoAck, @@ -155,7 +155,7 @@ object Consume { } yield ConsumerTag(rs) override def basicCancel(channel: AMQPChannel, consumerTag: ConsumerTag): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.basicCancel(consumerTag.value) } } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala index 17a2087b..99c5674e 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Declaration.scala @@ -16,7 +16,7 @@ package dev.profunktor.fs2rabbit.algebra -import cats.effect.Sync +import cats.effect.{Blocker, ContextShift, Sync} import cats.syntax.functor._ import dev.profunktor.fs2rabbit.arguments._ import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig} @@ -24,9 +24,9 @@ import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._ import dev.profunktor.fs2rabbit.model.{AMQPChannel, ExchangeName, QueueName} object Declaration { - def make[F[_]: Sync]: Declaration[F] = new Declaration[F] { + def make[F[_]: Sync: ContextShift](blocker: Blocker): Declaration[F] = new Declaration[F] { override def declareExchange(channel: AMQPChannel, config: DeclarationExchangeConfig): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeDeclare( config.exchangeName.value, config.exchangeType.toString.toLowerCase, @@ -41,7 +41,7 @@ object Declaration { channel: AMQPChannel, config: DeclarationExchangeConfig ): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeDeclareNoWait( config.exchangeName.value, config.exchangeType.toString.toLowerCase, @@ -53,17 +53,17 @@ object Declaration { }.void override def declareExchangePassive(channel: AMQPChannel, exchangeName: ExchangeName): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeDeclarePassive(exchangeName.value) }.void override def declareQueue(channel: AMQPChannel): F[QueueName] = - Sync[F].delay { + blocker.delay { QueueName(channel.value.queueDeclare().getQueue) } override def declareQueue(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueDeclare( config.queueName.value, config.durable.isTrue, @@ -74,7 +74,7 @@ object Declaration { }.void override def declareQueueNoWait(channel: AMQPChannel, config: DeclarationQueueConfig): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueDeclareNoWait( config.queueName.value, config.durable.isTrue, @@ -85,7 +85,7 @@ object Declaration { }.void override def declareQueuePassive(channel: AMQPChannel, queueName: QueueName): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueDeclarePassive(queueName.value) }.void } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Deletion.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Deletion.scala index e065775d..aa74cdbd 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Deletion.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Deletion.scala @@ -16,7 +16,7 @@ package dev.profunktor.fs2rabbit.algebra -import cats.effect.Sync +import cats.effect.{Blocker, ContextShift, Sync} import cats.syntax.functor._ import dev.profunktor.fs2rabbit.config.deletion import dev.profunktor.fs2rabbit.config.deletion.{DeletionExchangeConfig, DeletionQueueConfig} @@ -24,9 +24,9 @@ import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._ import dev.profunktor.fs2rabbit.model.AMQPChannel object Deletion { - def make[F[_]: Sync]: Deletion[F] = new Deletion[F] { + def make[F[_]: Sync: ContextShift](blocker: Blocker): Deletion[F] = new Deletion[F] { override def deleteQueue(channel: AMQPChannel, config: DeletionQueueConfig): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueDelete( config.queueName.value, config.ifUnused.isTrue, @@ -35,7 +35,7 @@ object Deletion { }.void override def deleteQueueNoWait(channel: AMQPChannel, config: DeletionQueueConfig): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.queueDeleteNoWait( config.queueName.value, config.ifUnused.isTrue, @@ -47,7 +47,7 @@ object Deletion { channel: AMQPChannel, config: deletion.DeletionExchangeConfig ): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeDelete(config.exchangeName.value, config.ifUnused.isTrue) }.void @@ -55,7 +55,7 @@ object Deletion { channel: AMQPChannel, config: deletion.DeletionExchangeConfig ): F[Unit] = - Sync[F].delay { + blocker.delay { channel.value.exchangeDeleteNoWait( config.exchangeName.value, config.ifUnused.isTrue diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala index 86bfe3ea..713e1794 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/interpreter/RabbitClient.scala @@ -45,16 +45,16 @@ object RabbitClient { val internalQ = new LiveInternalQueue[F](config.internalQueueSize.getOrElse(500)) val connection = ConnectionResource.make(config, sslContext, saslConfig, metricsCollector) - val consumingProgram = AckConsumingProgram.make[F](config, internalQ) + val consumingProgram = AckConsumingProgram.make[F](config, internalQ, blocker) val publishingProgram = PublishingProgram.make[F](blocker) (connection, consumingProgram, publishingProgram).mapN { case (conn, consuming, publish) => - val consumeClient = Consume.make[F] + val consumeClient = Consume.make[F](blocker) val publishClient = Publish.make[F](blocker) - val bindingClient = Binding.make[F] - val declarationClient = Declaration.make[F] - val deletionClient = Deletion.make[F] + val bindingClient = Binding.make[F](blocker) + val declarationClient = Declaration.make[F](blocker) + val deletionClient = Deletion.make[F](blocker) new RabbitClient[F]( conn, diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala index 32868b4e..7f540afd 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckConsumingProgram.scala @@ -27,8 +27,10 @@ import dev.profunktor.fs2rabbit.model._ import fs2.Stream object AckConsumingProgram { - def make[F[_]: Effect](configuration: Fs2RabbitConfig, internalQueue: InternalQueue[F]): F[AckConsumingProgram[F]] = - (AckingProgram.make(configuration), ConsumingProgram.make(internalQueue)).mapN { + def make[F[_]: Effect: ContextShift](configuration: Fs2RabbitConfig, + internalQueue: InternalQueue[F], + blocker: Blocker): F[AckConsumingProgram[F]] = + (AckingProgram.make(configuration, blocker), ConsumingProgram.make(internalQueue, blocker)).mapN { case (ap, cp) => WrapperAckConsumingProgram(ap, cp) } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala index acd77bd5..a1849c69 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/AckingProgram.scala @@ -17,7 +17,7 @@ package dev.profunktor.fs2rabbit.program import cats.Applicative -import cats.effect.{Effect, Sync} +import cats.effect.{Blocker, ContextShift, Effect, Sync} import dev.profunktor.fs2rabbit.algebra.{AMQPInternals, Acking, Consume} import dev.profunktor.fs2rabbit.arguments.Arguments import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig @@ -25,8 +25,8 @@ import dev.profunktor.fs2rabbit.model.AckResult.{Ack, NAck, Reject} import dev.profunktor.fs2rabbit.model._ object AckingProgram { - def make[F[_]: Effect](config: Fs2RabbitConfig): F[AckingProgram[F]] = Sync[F].delay { - WrapperAckingProgram(config, Consume.make) + def make[F[_]: Effect: ContextShift](config: Fs2RabbitConfig, blocker: Blocker): F[AckingProgram[F]] = Sync[F].delay { + WrapperAckingProgram(config, Consume.make(blocker)) } } diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala index 355810a1..da7d1df9 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/program/ConsumingProgram.scala @@ -16,7 +16,7 @@ package dev.profunktor.fs2rabbit.program -import cats.effect.{Effect, Sync} +import cats.effect.{Blocker, ContextShift, Effect, Sync} import cats.implicits._ import dev.profunktor.fs2rabbit.algebra.ConsumingStream._ import dev.profunktor.fs2rabbit.algebra.{AMQPInternals, Consume, InternalQueue} @@ -26,9 +26,10 @@ import dev.profunktor.fs2rabbit.model._ import fs2.Stream object ConsumingProgram { - def make[F[_]: Effect](internalQueue: InternalQueue[F]): F[ConsumingProgram[F]] = Sync[F].delay { - WrapperConsumingProgram(internalQueue, Consume.make) - } + def make[F[_]: Effect: ContextShift](internalQueue: InternalQueue[F], blocker: Blocker): F[ConsumingProgram[F]] = + Sync[F].delay { + WrapperConsumingProgram(internalQueue, Consume.make(blocker)) + } } trait ConsumingProgram[F[_]] extends ConsumingStream[F] with Consume[F]