Skip to content

Commit

Permalink
Adds support for Knative and Keda co-existence. Which platform to run…
Browse files Browse the repository at this point in the history
… is determined by the PLATFORM environment variable. Environment variables are conditionally loaded for Keda if that platform is set. Keda will process one kafka message from fan out events with a manual commit. Timing events come from the Kafka message header. A timeout is defined for the Kafka message polling to obtain a fan out next visit message so that the message polling does not run continually when pods are scaled up. The bucket notification consumer offset is changed from statically set to latest to an environment variable with a default of latest.
  • Loading branch information
dspeck1 committed Dec 6, 2024
1 parent a20749b commit 7f12b75
Showing 1 changed file with 146 additions and 8 deletions.
154 changes: 146 additions & 8 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@
import json
import logging
import os
import signal
import socket
import sys
import time
import signal
import yaml
import uuid
import yaml

import boto3
from botocore.handlers import validate_bucket_name
import cloudevents.http
import confluent_kafka as kafka
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka import KafkaError

Check failure on line 44 in python/activator/activator.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F401

'confluent_kafka.KafkaError' imported but unused
import flask

from .config import PipelinesConfig
Expand All @@ -53,7 +58,8 @@
from .repo_tracker import LocalRepoTracker
from .visit import FannedOutVisit


# Platform that prompt processing will run on
platform = os.environ["PLATFORM"].lower()
# The short name for the instrument.
instrument_name = os.environ["RUBIN_INSTRUMENT"]
# The skymap to use in the central repo
Expand All @@ -76,6 +82,31 @@
kafka_group_id = str(uuid.uuid4())
# The topic on which to listen to updates to image_bucket
bucket_topic = os.environ.get("BUCKET_TOPIC", "rubin-prompt-processing")
# Offset for Kafka bucket notification.
bucket_notification_kafka_offset_reset = os.environ.get("BUCKET_NOTIFICATION_KAFKA_OFFSET_RESET", "latest")

# Conditionally load keda environment variables
if platform == "keda":
# Kafka Schema Registry URL for next visit fan out messages
fan_out_schema_registry_url = os.environ["FAN_OUT_SCHEMA_REGISTRY_URL"]
# Kafka cluster with next visit fanned out messages.
fan_out_kafka_cluster = os.environ["FAN_OUT_KAFKA_CLUSTER"]
# Kafka group for next visit fan out messages.
fan_out_kafka_group_id = os.environ["FAN_OUT_KAFKA_GROUP_ID"]
# Kafka topic for next visit fan out messages.
fan_out_kafka_topic = os.environ["FAN_OUT_KAFKA_TOPIC"]
# Kafka topic offset for next visit fan out messages.
fan_out_kafka_topic_offset = os.environ["FAN_OUT_KAFKA_TOPIC_OFFSET"]
# Kafka Fan Out SASL Mechansim.
fan_out_kafka_sasl_mechanism = os.environ["FAN_OUT_KAFKA_SASL_MECHANISM"]
# Kafka Fan Out Security Protocol.
fan_out_kafka_security_protocol = os.environ["FAN_OUT_KAFKA_SECURITY_PROTOCOL"]
# Kafka Fan Out Consumer Username.
fan_out_kafka_sasl_username = os.environ["FAN_OUT_KAFKA_SASL_USERNAME"]
# Kafka Fan Out Consumer Password.
fan_out_kafka_sasl_password = os.environ["FAN_OUT_KAFKA_SASL_PASSWORD"]
# Time to wait for fanned out messages before spawning new pod.
fanned_out_msg_listen_timeout = int(os.environ.get("FANNED_OUT_MSG_LISTEN_TIMEOUT", 300))

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -127,7 +158,7 @@ def _get_consumer():
return kafka.Consumer({
"bootstrap.servers": kafka_cluster,
"group.id": kafka_group_id,
"auto.offset.reset": "latest", # default, but make explicit
"auto.offset.reset": bucket_notification_kafka_offset_reset,
})


Expand Down Expand Up @@ -195,6 +226,105 @@ def create_app():
sys.exit(3)


def dict_to_fanned_out_visit(obj, ctx):
"""
Converts object literal(dict) to a Fanned Out instance.
Args:
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
obj (dict): Object literal(dict)
"""

if obj is None:
return None

return FannedOutVisit(**obj)


def keda_start():

try:
setup_usdf_logger(
labels={"instrument": instrument_name},
)

# Initialize local registry
registry = LocalRepoTracker.get()
registry.init_tracker()

# Check initialization and abort early
_get_consumer()
_get_storage_client()
_get_central_butler()
_get_local_repo()

_log.info("Worker ready to handle requests.")

except Exception as e:
_log.critical("Failed to start worker; aborting.")
_log.exception(e)

# Initialize schema registry for fan out
fan_out_schema_registry_conf = {'url': fan_out_schema_registry_url}
fan_out_schema_registry_client = SchemaRegistryClient(fan_out_schema_registry_conf)

fan_out_avro_deserializer = AvroDeserializer(schema_registry_client=fan_out_schema_registry_client,
from_dict=dict_to_fanned_out_visit)
fan_out_consumer_conf = {
"bootstrap.servers": fan_out_kafka_cluster,
"group.id": fan_out_kafka_group_id,
"auto.offset.reset": fan_out_kafka_topic_offset,
"sasl.mechanism": fan_out_kafka_sasl_mechanism,
"security.protocol": fan_out_kafka_security_protocol,
"sasl.username": fan_out_kafka_sasl_username,
"sasl.password": fan_out_kafka_sasl_password,
'enable.auto.commit': False
}

_log.info("starting fan out consumer")
fan_out_consumer = kafka.Consumer(fan_out_consumer_conf, logger=_log)
fan_out_consumer.subscribe([fan_out_kafka_topic])
fan_out_listen_start_time = time.time()

try:
while time.time() - fan_out_listen_start_time < fanned_out_msg_listen_timeout:

fan_out_message = fan_out_consumer.poll(timeout=5)
if fan_out_message is None:
continue
if fan_out_message.error():
_log.warning("Fanned out consumer error: %s", fan_out_message.error())
else:
deserialized_fan_out_visit = fan_out_avro_deserializer(fan_out_message.value(),
SerializationContext(
fan_out_message.topic(),
MessageField.VALUE))
_log.info("Unpacked message as %r.", deserialized_fan_out_visit)

# Calculate time to load knative and receive message based on timestamp in Kafka message
_log.debug("Message timestamp %r", fan_out_message.timestamp())
fan_out_kafka_msg_timestamp = fan_out_message.timestamp()
fan_out_to_prompt_time = int(time.time() * 1000) - fan_out_kafka_msg_timestamp[1]
_log.debug("Seconds since fan out message delivered %r", fan_out_to_prompt_time/1000)

# Commit message and close client
fan_out_consumer.commit(message=fan_out_message, asynchronous=False)
fan_out_consumer.close()

try:
# Process fan out visit
process_visit(deserialized_fan_out_visit)
_log.info("Processing completed for %s", socket.gethostname())
break
except Exception as e:
_log.critical("Process visit failed; aborting.")
_log.exception(e)

finally:
# TODO Handle local registry unregistration on DM-47975
_log.info("Finished listening for fanned out messages")


def _graceful_shutdown(signum: int, stack_frame):
"""Signal handler for cases where the service should gracefully shut down.
Expand Down Expand Up @@ -586,10 +716,18 @@ def server_error(e: Exception) -> tuple[str, int]:


def main():
# This function is only called in test environments. Container
# deployments call `create_app()()` through Gunicorn.
app = create_app()
app.run(host="127.0.0.1", port=8080, debug=True)
# Knative deployments call `create_app()()` through Gunicorn.
# Keda deployments invoke main.
if platform == "knative":
_log.info("starting standalone Flask app")
app = create_app()
app.run(host="127.0.0.1", port=8080, debug=True)
# starts keda instance of the application
elif platform == "keda":
_log.info("starting keda instance")
keda_start()
else:
_log.info("no platform defined")


if __name__ == "__main__":
Expand Down

0 comments on commit 7f12b75

Please sign in to comment.