Skip to content

Commit

Permalink
Merge pull request #36 from commercetools/sink-with-delay
Browse files Browse the repository at this point in the history
Add delay to QueuePublisher.sink
  • Loading branch information
AL333Z authored Sep 20, 2024
2 parents 039cf89 + adef0e3 commit 43c3e4b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ lazy val core: CrossProject = crossProject(JVMPlatform)
.settings(commonSettings)
.settings(
name := "fs2-queues-core",
// TODO: Remove once next version is published
mimaBinaryIssueFilters ++= List(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.commercetools.queue.QueuePublisher.sink")
),
libraryDependencies ++= List(
"co.fs2" %%% "fs2-core" % Versions.fs2
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package com.commercetools.queue
import cats.effect.{MonadCancel, Resource}
import fs2.Stream

import scala.concurrent.duration.FiniteDuration

/**
* The interface to publish to a queue.
*/
Expand All @@ -41,10 +43,11 @@ sealed abstract class QueuePublisher[F[_], T](implicit F: MonadCancel[F, Throwab
* produced data to the queue. The messages are published in batches, according
* to the `batchSize` parameter.
*/
def sink(batchSize: Int = 10)(upstream: Stream[F, (T, Map[String, String])]): Stream[F, Nothing] =
def sink(batchSize: Int = 10, delay: Option[FiniteDuration] = None)(upstream: Stream[F, (T, Map[String, String])])
: Stream[F, Nothing] =
Stream.resource(pusher).flatMap { pusher =>
upstream.chunkN(batchSize).foreach { chunk =>
pusher.push(chunk.toList, None)
pusher.push(chunk.toList, delay)
}
}

Expand Down

0 comments on commit 43c3e4b

Please sign in to comment.