Skip to content

Commit

Permalink
Removed Kafka exception handling. Removed knative event time calculat…
Browse files Browse the repository at this point in the history
…ion. Discovered that the event time header is sent by default in a different format. Added exception handling for process visit.
  • Loading branch information
dspeck1 committed Dec 6, 2024
1 parent fa37ef8 commit aa3144e
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka import KafkaException, KafkaError
from confluent_kafka import KafkaError
import flask

from .config import PipelinesConfig
Expand Down Expand Up @@ -296,11 +296,6 @@ def keda_start():
if fan_out_message.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
_log.debug('No more messages. Reached end of offset')
if fan_out_message.error().code() == KafkaError._ALL_BROKERS_DOWN:
_log.warning('All prompt processing kafka brokers are down')
if fan_out_message.error().code() == KafkaError._AUTHENTICATION:
_log.error('Authentication failure with fanned out event topic')
raise KafkaException(fan_out_message.error())
elif fan_out_message.error():
_log.warning("Fanned out consumer error: %s", fan_out_message.error())
else:
Expand All @@ -319,12 +314,18 @@ def keda_start():
# Commit message and close client
fan_out_consumer.commit(message=fan_out_message, asynchronous=False)
fan_out_consumer.close()
# Process fan out visit
process_visit(deserialized_fan_out_visit)
_log.info("Processing completed for %s", socket.gethostname())
break

try:
# Process fan out visit
process_visit(deserialized_fan_out_visit)
_log.info("Processing completed for %s", socket.gethostname())
break
except Exception as e:
_log.critical("Process visit failed; aborting.")
_log.exception(e)

finally:
# TODO Handle local registry unregistration
# TODO Handle local registry unregistration on DM-47975
_log.info("Finished listening for fanned out messages")


Expand Down Expand Up @@ -402,13 +403,6 @@ def parse_next_visit(http_request):
if not event.data:
raise ValueError("empty CloudEvent received")

if event['time']:
# Calculate time to load knative and receive message based on time header from knative request
_log.debug("Fan out send event at %s", event['time'])
fan_out_knative_msg_timestamp = float(event['time'])
fan_out_to_prompt_time = time.time() - fan_out_knative_msg_timestamp
_log.debug("Seconds since fan out message delivered %r", fan_out_to_prompt_time)

# Message format is determined by the nextvisit-start deployment.
data = json.loads(event.data)
visit = FannedOutVisit(**data)
Expand Down

0 comments on commit aa3144e

Please sign in to comment.