Skip to content

Commit

Permalink
Add in missing AmqpHeaderVal classes (#213)
Browse files Browse the repository at this point in the history
* Add in missing AmqpHeaderVal classes
  • Loading branch information
changlinli authored and gvolpe committed Jul 10, 2019
1 parent 315abd2 commit e156915
Show file tree
Hide file tree
Showing 11 changed files with 498 additions and 103 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ val commonSettings = Seq(
licenses += ("Apache-2.0", new URL("https://www.apache.org/licenses/LICENSE-2.0.txt")),
homepage := Some(url("https://fs2-rabbit.profunktor.dev/")),
headerLicense := Some(HeaderLicense.ALv2("2017-2019", "ProfunKtor")),
scalacOptions in (Compile, doc) ++= Seq("-no-link-warnings"),
scalacOptions ++= determineVersionSpecificDeps(scalaVersion.value).scalacOptions,
libraryDependencies ++= {
val library = determineVersionSpecificDeps(scalaVersion.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package dev.profunktor.fs2rabbit.effects
import cats.{Applicative, ApplicativeError}
import cats.data.Kleisli
import dev.profunktor.fs2rabbit.model.{AmqpHeaderVal, AmqpProperties, ExchangeName, RoutingKey}
import dev.profunktor.fs2rabbit.model.AmqpHeaderVal._
import dev.profunktor.fs2rabbit.model.{AmqpFieldValue, AmqpProperties, ExchangeName, RoutingKey}
import dev.profunktor.fs2rabbit.model.AmqpFieldValue._
import cats.implicits._

object EnvelopeDecoder {
Expand All @@ -39,10 +39,10 @@ object EnvelopeDecoder {
def redelivered[F[_]: Applicative]: EnvelopeDecoder[F, Boolean] =
Kleisli(e => e.redelivered.pure[F])

def header[F[_]](name: String)(implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, AmqpHeaderVal] =
def header[F[_]](name: String)(implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, AmqpFieldValue] =
Kleisli(e => F.catchNonFatal(e.properties.headers(name)))

def optHeader[F[_]: Applicative](name: String): EnvelopeDecoder[F, Option[AmqpHeaderVal]] =
def optHeader[F[_]: Applicative](name: String): EnvelopeDecoder[F, Option[AmqpFieldValue]] =
Kleisli(_.properties.headers.get(name).pure[F])

def stringHeader[F[_]: ApplicativeError[?[_], Throwable]](name: String): EnvelopeDecoder[F, String] =
Expand Down Expand Up @@ -70,13 +70,13 @@ object EnvelopeDecoder {
name: String): EnvelopeDecoder[F, Option[collection.Seq[Any]]] =
optHeaderPF[F, collection.Seq[Any]](name) { case ArrayVal(a) => a }

private def headerPF[F[_], A](name: String)(pf: PartialFunction[AmqpHeaderVal, A])(
private def headerPF[F[_], A](name: String)(pf: PartialFunction[AmqpFieldValue, A])(
implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, A] =
Kleisli { env =>
F.catchNonFatal(pf(env.properties.headers(name)))
}

private def optHeaderPF[F[_], A](name: String)(pf: PartialFunction[AmqpHeaderVal, A])(
private def optHeaderPF[F[_], A](name: String)(pf: PartialFunction[AmqpFieldValue, A])(
implicit F: ApplicativeError[F, Throwable]): EnvelopeDecoder[F, Option[A]] =
Kleisli(_.properties.headers.get(name).traverse(h => F.catchNonFatal(pf(h))))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
package dev.profunktor.fs2rabbit.interpreter

import cats.Applicative
import cats.effect.{Effect, Sync}
import cats.effect.syntax.effect._
import cats.effect.{Effect, Sync}
import cats.syntax.flatMap._
import cats.syntax.functor._
import com.rabbitmq.client._
import dev.profunktor.fs2rabbit.algebra.{AMQPClient, AMQPInternals}
import dev.profunktor.fs2rabbit.arguments._
import dev.profunktor.fs2rabbit.config.declaration.{DeclarationExchangeConfig, DeclarationQueueConfig}
import dev.profunktor.fs2rabbit.config.deletion
import dev.profunktor.fs2rabbit.config.deletion.DeletionQueueConfig
import dev.profunktor.fs2rabbit.effects.BoolValue.syntax._
import dev.profunktor.fs2rabbit.model._
import com.rabbitmq.client._

class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {

private[fs2rabbit] def defaultConsumer[A](
channel: Channel,
internals: AMQPInternals[F]
): F[Consumer] = Applicative[F].pure {
): F[Consumer] = Sync[F].delay {
new DefaultConsumer(channel) {

override def handleCancel(consumerTag: String): Unit =
Expand All @@ -52,18 +52,48 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {
properties: AMQP.BasicProperties,
body: Array[Byte]
): Unit = {
val tag = envelope.getDeliveryTag
val routingKey = RoutingKey(envelope.getRoutingKey)
val exchange = ExchangeName(envelope.getExchange)
val redelivered = envelope.isRedeliver
val props = AmqpProperties.from(properties)
internals.queue.fold(()) { internalQ =>
val envelope = AmqpEnvelope(DeliveryTag(tag), body, props, exchange, routingKey, redelivered)
internalQ
.enqueue1(Right(envelope))
.toIO
.unsafeRunAsync(_ => ())
// This should not go wrong (if it does it is an indication of a bug in
// unsafeFrom)!
// However, I'm not entirely confident I've nailed down unsafeFrom (
// since it requires a pretty intricate understanding of the underlying
// Java library) so just in case, we're wrapping it in a Try so that a
// bug here doesn't bring down our entire queue.
val amqpPropertiesOrErr = scala.util.Try(AmqpProperties.unsafeFrom(properties)) match {
// toEither is not supported by Scala 2.11 so we have a manual match
case scala.util.Success(amqpProperties) => Right(amqpProperties)
case scala.util.Failure(err) =>
val rewrappedError = new Exception(
"You've stumbled across a bug in the interface between the underlying " +
"RabbitMQ Java library and fs2-rabbit! Please report this bug and " +
"include this stack trace and message.\nThe BasicProperties instance " +
s"that caused this error was:\n$properties\n",
err
)
Left(rewrappedError)
}
// Calling the Functor instance manually is because of three annoying things:
// 1. Scala 2.11 doesn't have right-biased Either so .map doesn't work,
// 2. Scala 2.13 deprecates .right so .right.map doesn't work either
// (since we have fatal warnings).
// 3. import cats.implicits._ doesn't work because it warns about an
// unused import for Scala 2.12 and Scala 2.13
// So we invoke the Either Functor instance manually
import cats.instances.either.catsStdInstancesForEither

val envelopeOrErr = catsStdInstancesForEither.map(amqpPropertiesOrErr) { props =>
val tag = envelope.getDeliveryTag
val routingKey = RoutingKey(envelope.getRoutingKey)
val exchange = ExchangeName(envelope.getExchange)
val redelivered = envelope.isRedeliver
AmqpEnvelope(DeliveryTag(tag), body, props, exchange, routingKey, redelivered)
}

internals.queue
.fold(Applicative[F].pure(())) { internalQ =>
internalQ.enqueue1(envelopeOrErr)
}
.toIO
.unsafeRunAsync(_ => ())
}
}
}
Expand Down Expand Up @@ -168,7 +198,7 @@ class AmqpClientEffect[F[_]: Effect] extends AMQPClient[F] {
ReplyText(replyText),
ExchangeName(exchange),
RoutingKey(routingKey),
AmqpProperties.from(properties),
AmqpProperties.unsafeFrom(properties),
AmqpBody(body)
)

Expand Down
Loading

0 comments on commit e156915

Please sign in to comment.