Skip to content

Commit

Permalink
Add logging to track time between messages received for consumer poll…
Browse files Browse the repository at this point in the history
…ing.
  • Loading branch information
dspeck1 committed Dec 18, 2024
1 parent e6e69ba commit 6e087dc
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,24 @@ def keda_start():
fan_out_consumer.subscribe([fan_out_kafka_topic])
fan_out_listen_start_time = time.time()

consumer_polls = 0

try:
while time.time() - fan_out_listen_start_time < fanned_out_msg_listen_timeout:

fan_out_message = fan_out_consumer.poll(timeout=5)
consumer_polls += 1

if fan_out_message is None:
continue
if fan_out_message.error():
_log.warning("Fanned out consumer error: %s", fan_out_message.error())
else:
if consumer_polls > 1:
fan_out_listen_finish_time = time.time()
fan_out_listen_time = fan_out_listen_finish_time - fan_out_listen_start_time
_log.debug("Seconds since last message received %r for consumer poll %r",
fan_out_listen_time, consumer_polls)
deserialized_fan_out_visit = fan_out_avro_deserializer(fan_out_message.value(),
SerializationContext(
fan_out_message.topic(),
Expand Down

0 comments on commit 6e087dc

Please sign in to comment.