Skip to content

Commit

Permalink
Merge pull request #399 from catostrophe/update-deps
Browse files Browse the repository at this point in the history
Update dependencies
  • Loading branch information
gvolpe authored Sep 9, 2020
2 parents a37c5b5 + bcf398b commit 0c7fa6d
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Dependencies._
import microsites.ExtraMdFileConfig

ThisBuild / name := """fs2-rabbit"""
ThisBuild / crossScalaVersions := List("2.12.10", "2.13.1")
ThisBuild / crossScalaVersions := List("2.12.12", "2.13.3")
ThisBuild / organization := "dev.profunktor"
ThisBuild / homepage := Some(url("https://fs2-rabbit.profunktor.dev/"))
ThisBuild / licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ class AckerConsumerDemo[F[_]: Concurrent: Timer](fs2Rabbit: RabbitClient[F]) {
implicit val stringMessageEncoder =
Kleisli[F, AmqpMessage[String], AmqpMessage[Array[Byte]]](s => s.copy(payload = s.payload.getBytes(UTF_8)).pure[F])

def logPipe: Pipe[F, AmqpEnvelope[String], AckResult] = _.evalMap { amqpMsg =>
putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag))
}
def logPipe: Pipe[F, AmqpEnvelope[String], AckResult] =
_.evalMap { amqpMsg =>
putStrLn(s"Consumed: $amqpMsg").as(Ack(amqpMsg.deliveryTag))
}

val publishingFlag: PublishingFlag = PublishingFlag(mandatory = true)

Expand Down Expand Up @@ -91,7 +92,7 @@ class Flow[F[_]: Concurrent, A](
val flow: Stream[F, Unit] =
Stream(
Stream(simpleMessage).covary[F].evalMap(publisher),
Stream(classMessage).covary[F].through(jsonPipe).evalMap(publisher),
Stream(classMessage).through(jsonPipe).covary[F].evalMap(publisher),
consumer.through(logger).evalMap(acker)
).parJoin(3)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.Executors

import cats.data.NonEmptyList
import cats.effect._
import cats.syntax.functor._
import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig}
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.resiliency.ResilientStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ object ZIOAutoAckConsumer extends CatsApp {
.make(Task(Executors.newCachedThreadPool()))(es => Task(es.shutdown()))
.map(Blocker.liftExecutorService)

override def run(args: List[String]): UIO[Int] =
override def run(args: List[String]): URIO[ZEnv, ExitCode] =
blockerResource
.use { blocker =>
RabbitClient[Task](config, blocker).flatMap { client =>
ResilientStream
.runF(new AutoAckConsumerDemo[Task](client).program)
}
}
.run
.map(_ => 0)
.orDie
.as(ExitCode.success)

}
10 changes: 5 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ object Dependencies {

object Version {
val cats = "2.2.0"
val catsEffect = "2.1.2"
val fs2 = "2.3.0"
val catsEffect = "2.2.0"
val fs2 = "2.4.4"
val circe = "0.13.0"
val amqpClient = "5.9.0"
val logback = "1.2.3"
val monix = "3.1.0"
val zio = "1.0.0-RC19-2"
val zioCats = "2.0.0.0-RC14"
val monix = "3.2.2"
val zio = "1.0.1"
val zioCats = "2.1.4.0"
val scodec = "1.0.0"
val dropwizard = "4.1.12.1"

Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ resolvers += "Typesafe Repository" at "https://repo.typesafe.com/typesafe/releas
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.3")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")
addSbtPlugin("com.47deg" % "sbt-microsites" % "1.2.1")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.1.4")
addSbtPlugin("com.scalapenos" % "sbt-prompt" % "1.0.2")
addSbtPlugin("com.lucidchart" % "sbt-scalafmt" % "1.16")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.13")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
2 changes: 1 addition & 1 deletion site/docs/examples/sample-acker.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Flow[F[_]: Concurrent, A](
val flow: Stream[F, Unit] =
Stream(
Stream(simpleMessage).covary[F].evalMap(publisher),
Stream(classMessage).covary[F].through(jsonPipe).evalMap(publisher),
Stream(classMessage).through(jsonPipe).covary[F].evalMap(publisher),
consumer.through(logger).evalMap(acker)
).parJoin(3)

Expand Down
2 changes: 1 addition & 1 deletion site/docs/examples/sample-autoack.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class AutoAckFlow[F[_]: Concurrent, A](
val flow: Stream[F, Unit] =
Stream(
Stream(simpleMessage).covary[F] evalMap publisher,
Stream(classMessage).covary[F] through jsonPipe evalMap publisher,
Stream(classMessage).through(jsonPipe).covary[F] evalMap publisher,
consumer.through(logger).evalMap(ack => Sync[F].delay(println(ack)))
).parJoin(3)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ trait Fs2RabbitSpec { self: BaseSpec =>
for {
queue <- mkRandomString.map(QueueName)
result <- declareQueuePassive(queue).attempt
} yield {
result.left.value shouldBe a[java.io.IOException]
}
} yield result.left.value shouldBe a[java.io.IOException]
}
}

Expand All @@ -129,9 +127,7 @@ trait Fs2RabbitSpec { self: BaseSpec =>
for {
exchange <- mkRandomString.map(ExchangeName)
result <- declareExchangePassive(exchange).attempt
} yield {
result.left.value shouldBe a[java.io.IOException]
}
} yield result.left.value shouldBe a[java.io.IOException]
}
}

Expand Down Expand Up @@ -188,9 +184,7 @@ trait Fs2RabbitSpec { self: BaseSpec =>
_ <- bindQueue(q, x, rk, QueueBindingArgs(Map.empty))
ct <- mkRandomString.map(ConsumerTag)
result <- basicCancel(ct).attempt
} yield {
result.left.value shouldBe a[java.io.IOException]
}
} yield result.left.value shouldBe a[java.io.IOException]
}
}

Expand Down Expand Up @@ -686,28 +680,30 @@ trait Fs2RabbitSpec { self: BaseSpec =>
RabbitClient[IO](config, blocker).flatMap(r => fa(r).compile.drain)
}
.as(emptyAssertion)
.unsafeToFuture
.unsafeToFuture()

private def withStreamNackRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] =
blockerResource
.use { blocker =>
RabbitClient[IO](config.copy(requeueOnNack = true), blocker).flatMap(r => fa(r).compile.drain)
}
.as(emptyAssertion)
.unsafeToFuture
.unsafeToFuture()

private def withStreamRejectRabbit[A](fa: RabbitClient[IO] => Stream[IO, A]): Future[Assertion] =
blockerResource
.use { blocker =>
RabbitClient[IO](config.copy(requeueOnReject = true), blocker).flatMap(r => fa(r).compile.drain)
}
.as(emptyAssertion)
.unsafeToFuture
.unsafeToFuture()

private def withRabbit[A](fa: RabbitClient[IO] => IO[A]): Future[A] =
blockerResource.use { blocker =>
RabbitClient[IO](config, blocker).flatMap(r => fa(r))
}.unsafeToFuture
blockerResource
.use { blocker =>
RabbitClient[IO](config, blocker).flatMap(r => fa(r))
}
.unsafeToFuture()

private def randomQueueData: IO[(QueueName, ExchangeName, RoutingKey)] =
(mkRandomString, mkRandomString, mkRandomString).mapN {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ResilientStreamSpec extends BaseSpec {

it should "run a stream until it's finished" in {
val program = Stream(1, 2, 3).covary[IO].through(sink)
ResilientStream.run(program).as(emptyAssertion).unsafeToFuture
ResilientStream.run(program).as(emptyAssertion).unsafeToFuture()
}

it should "run a stream and recover in case of failure" in {
Expand All @@ -47,7 +47,7 @@ class ResilientStreamSpec extends BaseSpec {
}
}

Ref.of[IO, Int](2).flatMap(r => ResilientStream.run(p(r), 1.second)).as(emptyAssertion).unsafeToFuture
Ref.of[IO, Int](2).flatMap(r => ResilientStream.run(p(r), 1.second)).as(emptyAssertion).unsafeToFuture()
}

}

0 comments on commit 0c7fa6d

Please sign in to comment.