diff --git a/src/prefect/events/clients.py b/src/prefect/events/clients.py index 4bd2d53f74d8..b83f0e4c5cdf 100644 --- a/src/prefect/events/clients.py +++ b/src/prefect/events/clients.py @@ -403,17 +403,11 @@ async def _reconnect(self) -> None: await self.emit(event) logger.debug("Finished resending unconfirmed events.") - async def _checkpoint(self, event: Event) -> None: + async def _checkpoint(self) -> None: assert self._websocket unconfirmed_count = len(self._unconfirmed_events) - logger.debug( - "Added event id=%s to unconfirmed events list. " - "There are now %s unconfirmed events.", - event.id, - unconfirmed_count, - ) if unconfirmed_count < self._checkpoint_every: return @@ -431,8 +425,16 @@ async def _checkpoint(self, event: Event) -> None: async def _emit(self, event: Event) -> None: self._log_debug("Emitting event id=%s.", event.id) + self._unconfirmed_events.append(event) + logger.debug( + "Added event id=%s to unconfirmed events list. " + "There are now %s unconfirmed events.", + event.id, + len(self._unconfirmed_events), + ) + for i in range(self._reconnection_attempts + 1): self._log_debug("Emit reconnection attempt %s.", i) try: @@ -450,7 +452,7 @@ async def _emit(self, event: Event) -> None: self._log_debug("Sending event id=%s.", event.id) await self._websocket.send(event.model_dump_json()) self._log_debug("Checkpointing event id=%s.", event.id) - await self._checkpoint(event) + await self._checkpoint() return except ConnectionClosed: