From adef0e3300d77e67dafc1b0a4c4d7f44c583bacc Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Fri, 20 Sep 2024 16:01:20 +0200 Subject: [PATCH] Add delay to QueuePublisher.sink --- build.sbt | 4 ++++ .../scala/com/commercetools/queue/QueuePublisher.scala | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 7e9588e..1570abc 100644 --- a/build.sbt +++ b/build.sbt @@ -53,6 +53,10 @@ lazy val core: CrossProject = crossProject(JVMPlatform) .settings(commonSettings) .settings( name := "fs2-queues-core", + // TODO: Remove once next version is published + mimaBinaryIssueFilters ++= List( + ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink") + ), libraryDependencies ++= List( "co.fs2" %%% "fs2-core" % Versions.fs2 ) diff --git a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala index 81c0caa..dae5db1 100644 --- a/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala +++ b/core/src/main/scala/com/commercetools/queue/QueuePublisher.scala @@ -19,6 +19,8 @@ package com.commercetools.queue import cats.effect.{MonadCancel, Resource} import fs2.Stream +import scala.concurrent.duration.FiniteDuration + /** * The interface to publish to a queue. */ @@ -41,10 +43,11 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab * produced data to the queue. The messages are published in batches, according * to the `batchSize` parameter. */ - def sink(batchSize: Int = 10)(upstream: Stream[F, (T, Map[String, String])]): Stream[F, Nothing] = + def sink(batchSize: Int = 10, delay: Option[FiniteDuration] = None)(upstream: Stream[F, (T, Map[String, String])]) + : Stream[F, Nothing] = Stream.resource(pusher).flatMap { pusher => upstream.chunkN(batchSize).foreach { chunk => - pusher.push(chunk.toList, None) + pusher.push(chunk.toList, delay) } }