Skip to content

Commit

Permalink
Fix signature of _checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bunchesofdonald committed Feb 26, 2025
1 parent 3c3cde1 commit 5b67af8
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,11 @@ async def _reconnect(self) -> None:
await self.emit(event)
logger.debug("Finished resending unconfirmed events.")

async def _checkpoint(self, event: Event) -> None:
async def _checkpoint(self) -> None:
assert self._websocket

unconfirmed_count = len(self._unconfirmed_events)

logger.debug(
"Added event id=%s to unconfirmed events list. "
"There are now %s unconfirmed events.",
event.id,
unconfirmed_count,
)
if unconfirmed_count < self._checkpoint_every:
return

Expand All @@ -431,8 +425,16 @@ async def _checkpoint(self, event: Event) -> None:

async def _emit(self, event: Event) -> None:
self._log_debug("Emitting event id=%s.", event.id)

self._unconfirmed_events.append(event)

logger.debug(
"Added event id=%s to unconfirmed events list. "
"There are now %s unconfirmed events.",
event.id,
len(self._unconfirmed_events),
)

for i in range(self._reconnection_attempts + 1):
self._log_debug("Emit reconnection attempt %s.", i)
try:
Expand All @@ -450,7 +452,7 @@ async def _emit(self, event: Event) -> None:
self._log_debug("Sending event id=%s.", event.id)
await self._websocket.send(event.model_dump_json())
self._log_debug("Checkpointing event id=%s.", event.id)
await self._checkpoint(event)
await self._checkpoint()

return
except ConnectionClosed:
Expand Down

0 comments on commit 5b67af8

Please sign in to comment.