Skip to content

Commit

Permalink
Use delay in batch publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Mar 7, 2024
1 parent 29bd4bc commit 448dcb3
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,22 @@ class ServiceBusPusher[F[_], Data](sender: ServiceBusSenderClient)(implicit seri

override def push(message: Data, delay: Option[FiniteDuration]): F[Unit] = {
val sbMessage = new ServiceBusMessage(serializer.serialize(message))
delay.traverse_(delay =>
delay.traverse_ { delay =>
F.realTimeInstant
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))) *>
.map(now => sbMessage.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC)))
} *>
F.blocking(sender.sendMessage(sbMessage)).void
}

override def push(messages: List[Data], delay: Option[FiniteDuration]): F[Unit] = {
val sbMessages = messages.map(msg => new ServiceBusMessage(serializer.serialize(msg)))
delay.traverse_ { delay =>
F.realTimeInstant.map { now =>
sbMessages.foreach { msg =>
msg.setScheduledEnqueueTime(now.plusMillis(delay.toMillis).atOffset(ZoneOffset.UTC))
}
}
}
F.blocking(sender.sendMessages(sbMessages.asJava)).void
}

Expand Down

0 comments on commit 448dcb3

Please sign in to comment.