Skip to content

Commit

Permalink
Provide the consumed message to consumer.commit in AwaitMessageTrigger (
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Dec 18, 2023
1 parent 41096e0 commit 148233a
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions airflow/providers/apache/kafka/triggers/await_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ async def run(self):
else:
rv = await async_message_process(message)
if rv:
await async_commit(asynchronous=False)
await async_commit(message=message, asynchronous=False)
yield TriggerEvent(rv)
break
else:
await async_commit(asynchronous=False)
await async_commit(message=message, asynchronous=False)
await asyncio.sleep(self.poll_interval)

0 comments on commit 148233a

Please sign in to comment.