diff --git a/src/api/specs/WorkflowsAPI.yaml b/src/api/specs/WorkflowsAPI.yaml index 514519f1..a5bfe438 100644 --- a/src/api/specs/WorkflowsAPI.yaml +++ b/src/api/specs/WorkflowsAPI.yaml @@ -2085,6 +2085,35 @@ paths: $ref: '#/components/schemas/RespPipelineLock' '/v3/workflows/groups/{group_id}/pipelines/{pipeline_id}/runs/{pipeline_run_uuid}': + post: + tags: + - PipelineRuns + summary: Terminate a running pipeline + description: Terminate a running pipeline + operationId: terminatePipeline + parameters: + - name: group_id + in: path + required: true + schema: + $ref: '#/components/schemas/ID' + - name: pipeline_id + in: path + required: true + schema: + $ref: '#/components/schemas/ID' + - name: pipeline_run_uuid + in: path + required: true + schema: + type: string + responses: + '200': + description: Pipeline terminated. + content: + application/json: + schema: + $ref: '#/components/schemas/RespPipelineRun' get: tags: - PipelineRuns diff --git a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py index c634cd20..560f50b0 100644 --- a/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py +++ b/src/api/src/backend/helpers/PipelineDispatchRequestBuilder.py @@ -1,7 +1,6 @@ from uuid import uuid4 from django.forms import model_to_dict -from backend.utils.parse_directives import parse_directives as parse from backend.conf.constants import WORKFLOW_EXECUTOR_ACCESS_TOKEN from backend.serializers import TaskSerializer, PipelineSerializer @@ -19,6 +18,7 @@ def build( description=None, commit=None, directives=None, + run=None, args={} ): # Get the pipeline tasks, their contexts, destinations, and respective @@ -89,26 +89,15 @@ def build( request["meta"]["origin"] = base_url # Origin of the request - request["pipeline_run"] = {} + request["pipeline_run"] = run if run else {} + if not run: + uuid = uuid4() + request["pipeline_run"]["uuid"] = uuid + request["pipeline_run"]["name"] = name or uuid + request["pipeline_run"]["description"] = description - # Generate the uuid for this pipeline run - uuid = uuid4() - request["pipeline_run"]["uuid"] = uuid - request["pipeline_run"]["name"] = name or uuid - request["pipeline_run"]["description"] = description - - # # Parse the directives from the commit message - # directives_request = {} - # if commit != None: - # directives_request = parse(commit) - - # if directives != None and len(directives) > 0: - # directive_str = f"[{'|'.join([d for d in directives])}]" - # directives_request = parse(directive_str) - - # request["directives"] = directives_request - - request["directives"] = {} + # if not directives are provided. Default to RUN + request["directives"] = directives if directives else {"RUN": run.uuid} return request diff --git a/src/api/src/backend/models.py b/src/api/src/backend/models.py index 561c37b5..a61ab7a6 100644 --- a/src/api/src/backend/models.py +++ b/src/api/src/backend/models.py @@ -64,17 +64,58 @@ (TASK_PROTOCOL_FTPS, "ftps"), ] +FUNCTION_TASK_RUNTIME_PYTHON_LATEST = EnumRuntimeEnvironment.PythonLatest +FUNCTION_TASK_RUNTIME_PYTHON_SLIM = EnumRuntimeEnvironment.PythonSlim +FUNCTION_TASK_RUNTIME_PYTHON312 = EnumRuntimeEnvironment.Python312 +FUNCTION_TASK_RUNTIME_PYTHON312_SLIM = EnumRuntimeEnvironment.Python312Slim +FUNCTION_TASK_RUNTIME_PYTHON311 = EnumRuntimeEnvironment.Python311 +FUNCTION_TASK_RUNTIME_PYTHON311_SLIM = EnumRuntimeEnvironment.Python311Slim +FUNCTION_TASK_RUNTIME_PYTHON310 = EnumRuntimeEnvironment.Python10 +FUNCTION_TASK_RUNTIME_PYTHON310_SLIM = EnumRuntimeEnvironment.Python10Slim FUNCTION_TASK_RUNTIME_PYTHON39 = EnumRuntimeEnvironment.Python39 +FUNCTION_TASK_RUNTIME_PYTHON39_SLIM = EnumRuntimeEnvironment.Python39Slim +FUNCTION_TASK_RUNTIME_PYTHON38 = EnumRuntimeEnvironment.Python38 +FUNCTION_TASK_RUNTIME_PYTHON38_SLIM = EnumRuntimeEnvironment.Python38Slim +FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST = EnumRuntimeEnvironment.TensorflowLatest +FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST_GPU = EnumRuntimeEnvironment.TensorflowLatestGPU +FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120 = EnumRuntimeEnvironment.Tensorflow2120 +FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120_GPU = EnumRuntimeEnvironment.Tensorflow2120GPU +FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST = EnumRuntimeEnvironment.PytorchLatest +FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST_GPU = EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPULatest +FUNCTION_TASK_RUNTIME_PYTHON_HUGGINGFACE_TRANSFORMERS_PYTORCH_GPU4292 = EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPU4292 FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY = EnumRuntimeEnvironment.PythonSingularity +FUNCTION_TASK_RUNTIME_PYTHON_PYGEOFLOOD = EnumRuntimeEnvironment.PyGeoFlood FUNCTION_TASK_RUNTIMES = [ + (FUNCTION_TASK_RUNTIME_PYTHON_LATEST, EnumRuntimeEnvironment.PythonLatest), + (FUNCTION_TASK_RUNTIME_PYTHON_SLIM, EnumRuntimeEnvironment.PythonSlim), + (FUNCTION_TASK_RUNTIME_PYTHON312, EnumRuntimeEnvironment.Python312), + (FUNCTION_TASK_RUNTIME_PYTHON312_SLIM, EnumRuntimeEnvironment.Python312Slim), + (FUNCTION_TASK_RUNTIME_PYTHON311, EnumRuntimeEnvironment.Python311), + (FUNCTION_TASK_RUNTIME_PYTHON311_SLIM, EnumRuntimeEnvironment.Python311Slim), + (FUNCTION_TASK_RUNTIME_PYTHON310, EnumRuntimeEnvironment.Python10), + (FUNCTION_TASK_RUNTIME_PYTHON310_SLIM, EnumRuntimeEnvironment.Python10Slim), (FUNCTION_TASK_RUNTIME_PYTHON39, EnumRuntimeEnvironment.Python39), - (FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY, EnumRuntimeEnvironment.PythonSingularity) + (FUNCTION_TASK_RUNTIME_PYTHON39_SLIM, EnumRuntimeEnvironment.Python39Slim), + (FUNCTION_TASK_RUNTIME_PYTHON38, EnumRuntimeEnvironment.Python38), + (FUNCTION_TASK_RUNTIME_PYTHON38_SLIM, EnumRuntimeEnvironment.Python38Slim), + (FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST, EnumRuntimeEnvironment.TensorflowLatest), + (FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW_LATEST_GPU, EnumRuntimeEnvironment.TensorflowLatestGPU), + (FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120, EnumRuntimeEnvironment.Tensorflow2120), + (FUNCTION_TASK_RUNTIME_PYTHON_TENSORFLOW2120_GPU, EnumRuntimeEnvironment.Tensorflow2120GPU), + (FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST, EnumRuntimeEnvironment.PytorchLatest), + (FUNCTION_TASK_RUNTIME_PYTHON_PYTORCH_LATEST_GPU, EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPULatest), + (FUNCTION_TASK_RUNTIME_PYTHON_HUGGINGFACE_TRANSFORMERS_PYTORCH_GPU4292, EnumRuntimeEnvironment.HuggingfaceTranformersPytorchGPU4292), + (FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY, EnumRuntimeEnvironment.PythonSingularity), + (FUNCTION_TASK_RUNTIME_PYTHON_PYGEOFLOOD, EnumRuntimeEnvironment.PyGeoFlood), ] FUNCTION_TASK_INSTALLERS = [ (EnumInstaller.Pip, EnumInstaller.Pip) ] +TASK_FLAVOR_C1_TINY = EnumTaskFlavor.C1_TINY +TASK_FLAVOR_C1_XXSML = EnumTaskFlavor.C1_XXSML +TASK_FLAVOR_C1_XSML = EnumTaskFlavor.C1_XSML TASK_FLAVOR_C1_SML = EnumTaskFlavor.C1_SML TASK_FLAVOR_C1_MED = EnumTaskFlavor.C1_MED TASK_FLAVOR_C1_LRG = EnumTaskFlavor.C1_LRG @@ -85,6 +126,9 @@ TASK_FLAVOR_G1_NVD_LRG = EnumTaskFlavor.G1_NVD_LRG TASK_FLAVORS = [ + (TASK_FLAVOR_C1_TINY, EnumTaskFlavor.C1_TINY), + (TASK_FLAVOR_C1_XXSML, EnumTaskFlavor.C1_XXSML), + (TASK_FLAVOR_C1_XSML, EnumTaskFlavor.C1_XSML), (TASK_FLAVOR_C1_SML, EnumTaskFlavor.C1_SML), (TASK_FLAVOR_C1_MED, EnumTaskFlavor.C1_MED), (TASK_FLAVOR_C1_LRG, EnumTaskFlavor.C1_LRG), @@ -452,10 +496,10 @@ class PipelineRun(models.Model): class Secret(models.Model): id = models.CharField(max_length=128) - tenant_id = models.CharField(max_length=128) description = models.TextField(null=True) - sk_secret_name = models.CharField(max_length=128, unique=True) owner = models.CharField(max_length=64) + sk_secret_name = models.CharField(max_length=128, unique=True) + tenant_id = models.CharField(max_length=128) uuid = models.UUIDField(primary_key=True, default=uuid.uuid4) class Meta: @@ -542,10 +586,6 @@ class Meta: def clean(self): errors = {} - - # Validate runtimes - (success, error) = self.validate_function_task_installers() - if not success: errors = {**errors, "invalid-runtime-installer": error} # Validate packages schema (success, error) = self.validate_packages_schema() @@ -554,17 +594,6 @@ def clean(self): if errors: raise ValidationError(errors) - def validate_function_task_installers(self) -> Tuple[bool, str]: - installer_runtime_mapping = { - FUNCTION_TASK_RUNTIME_PYTHON39: [EnumInstaller.Pip], - FUNCTION_TASK_RUNTIME_PYTHON_SINGULARITY: [EnumInstaller.Pip] - } - - installers_for_runtime = installer_runtime_mapping.get(self.runtime, None) - if installers_for_runtime == None: return (False, f"Invalid runtime '{self.runtime}'") - if self.installer not in installers_for_runtime: - return (False, f"Installer '{self.installer}' for runtime {self.runtime}") - def validate_packages_schema(self) -> Tuple[bool, str]: if type(self.packages) != list: return (False, f"Invalid installer: Installer '{self.installer}' for runtime {self.runtime}") diff --git a/src/api/src/backend/services/PipelineDispatcher.py b/src/api/src/backend/services/PipelineDispatcher.py index 5ee9706d..a50104f1 100644 --- a/src/api/src/backend/services/PipelineDispatcher.py +++ b/src/api/src/backend/services/PipelineDispatcher.py @@ -1,10 +1,10 @@ -import json, logging +import json from uuid import UUID from django.db import IntegrityError, DatabaseError, OperationalError from django.utils import timezone - +from backend.utils import logger from backend.services.MessageBroker import service as broker from backend.models import Pipeline, PipelineRun, RUN_STATUS_SUBMITTED from backend.errors.api import ServerError @@ -14,20 +14,21 @@ class PipelineDispatcher: def __init__(self): self.error = None - def dispatch(self, service_request: dict, pipeline): + def dispatch(self, service_request: dict, pipeline, pipeline_run=None): now = timezone.now() try: - # Create the pipeline run object - pipeline_run = PipelineRun.objects.create( - name=service_request["pipeline_run"]["name"], - description=service_request["pipeline_run"]["description"], - pipeline=pipeline, - status=RUN_STATUS_SUBMITTED, - uuid=service_request["pipeline_run"]["uuid"], - started_at=now, - last_modified=now - ) + # Create the pipeline run object if one was not provied + if pipeline_run != None: + pipeline_run = PipelineRun.objects.create( + name=service_request["pipeline_run"]["name"], + description=service_request["pipeline_run"]["description"], + pipeline=pipeline, + status=RUN_STATUS_SUBMITTED, + uuid=service_request["pipeline_run"]["uuid"], + started_at=now, + last_modified=now + ) # Update the pipeline object with the pipeline run pipeline = Pipeline.objects.filter(pk=pipeline.uuid).first() @@ -43,11 +44,10 @@ def dispatch(self, service_request: dict, pipeline): service_request["pipeline"].update(service_request_update) except (IntegrityError, DatabaseError, OperationalError) as e: - message = f"Failed to create PipelineRun: {e.__cause__}" - logging.error(message) + logger.exception(e.__cause__) raise ServerError(message=message) except Exception as e: - logging.error(str(e)) + logger.exception(e.__cause__) raise ServerError(message=str(e)) try: @@ -57,7 +57,8 @@ def dispatch(self, service_request: dict, pipeline): ) except Exception as e: # TODO use exact exception message = f"Failed publish the service request to the message broker: {e.__cause__}" - logging.error(message) + logger.error(message) + logger.exception(e.__cause__) raise ServerError(message=message) return pipeline_run diff --git a/src/api/src/backend/views/PipelineRuns.py b/src/api/src/backend/views/PipelineRuns.py index a07cf0d2..9ef252f9 100644 --- a/src/api/src/backend/views/PipelineRuns.py +++ b/src/api/src/backend/views/PipelineRuns.py @@ -3,18 +3,105 @@ from backend.views.RestrictedAPIView import RestrictedAPIView from backend.views.http.responses.BaseResponse import BaseResponse -from backend.views.http.responses.models import ModelResponse, ModelListResponse from backend.views.http.responses.errors import ( - ServerError, + ServerError as ServerErrorResp, Forbidden, NotFound, BadRequest ) from backend.services.GroupService import service as group_service from backend.models import PipelineRun, Pipeline +from backend.helpers.PipelineDispatchRequestBuilder import PipelineDispatchRequestBuilder +from backend.services.PipelineDispatcher import service as pipeline_dispatcher +from backend.errors.api import ServerError +from backend.utils import logger +from backend.services.CredentialsService import service as credentials_service +request_builder = PipelineDispatchRequestBuilder(credentials_service) + class PipelineRuns(RestrictedAPIView): + def post(self, request, group_id, pipeline_id, pipeline_run_uuid): + try: + # Get the group + group = group_service.get(group_id, request.tenant_id) + if group == None: + return NotFound(f"No group found with id '{group_id}'") + + # Check that the user belongs to the group + if not group_service.user_in_group(request.username, group_id, request.tenant_id): + return Forbidden(message="You do not have access to this group") + + # Find a pipeline that matches the request data + pipeline = Pipeline.objects.filter( + id=pipeline_id, + group=group + ).prefetch_related( + "group", + "archives", + "archives__archive", + "tasks", + "tasks__context", + "tasks__context__credentials", + "tasks__context__identity", + "tasks__destination", + "tasks__destination__credentials", + "tasks__destination__identity", + ).first() + + # Return if NotFound if no pipeline found + if pipeline == None: + return NotFound(f"Pipline '{pipeline_id}' does not exist") + + # Return NotFound if run not found + pipeline_run = PipelineRun.objects.filter( + pipeline=pipeline, + uuid=pipeline_run_uuid + ).next() + + if not pipeline_run: + return BadRequest(f"PiplineRun with uuid '{pipeline_run_uuid}' does not exist") + + try: + # Build the pipeline dispatch request + pipeline_dispatch_request = request_builder.build( + request.base_url, + group, + pipeline, + directives={"TEMINATE_RUN": [pipeline_run_uuid]}, + args={}, + run=pipeline_run, + ) + # Dispatch the request + pipeline_dispatcher.dispatch(pipeline_dispatch_request, pipeline, pipeline_run=pipeline_run) + except ServerError as e: + return ServerErrorResp(message=str(e)) + except Exception as e: + return ServerErrorResp(message=str(e)) + + + # Format the started at and last_modified + run = model_to_dict(run) + + run["started_at"] = run["started_at"].strftime("%Y-%m-%d %H:%M:%S") if run["started_at"] else None + run["last_modified"] = run["last_modified"].strftime("%Y-%m-%d %H:%M:%S") if run["last_modified"] else None + + return BaseResponse( + status=200, + success=True, + message="success", + result=run + ) + + # TODO catch the specific error thrown by the group service + except (DatabaseError, IntegrityError, OperationalError) as e: + logger.exception(e.__cause__) + return ServerError(message=e.__cause__) + except Exception as e: + logger.exception(e.__cause__) + return ServerError(message=e) + + def get(self, request, group_id, pipeline_id, pipeline_run_uuid=None, *_, **__): try: # Get the group @@ -62,8 +149,10 @@ def get(self, request, group_id, pipeline_id, pipeline_run_uuid=None, *_, **__) # TODO catch the specific error thrown by the group service except (DatabaseError, IntegrityError, OperationalError) as e: + logger.exception(e.__cause__) return ServerError(message=e.__cause__) except Exception as e: + logger.exception(e.__cause__) return ServerError(message=e) @@ -85,8 +174,10 @@ def list(self, pipeline): result=runs ) except (DatabaseError, IntegrityError, OperationalError) as e: + logger.exception(e.__cause__) return ServerError(message=e.__cause__) except Exception as e: + logger.exception(e.__cause__) return ServerError(message=e) \ No newline at end of file diff --git a/src/api/src/backend/views/http/requests.py b/src/api/src/backend/views/http/requests.py index 8d108cd4..fd44e361 100644 --- a/src/api/src/backend/views/http/requests.py +++ b/src/api/src/backend/views/http/requests.py @@ -54,11 +54,38 @@ class EnumImageBuilder(str, Enum, metaclass=_EnumMeta): Kaniko = "kaniko" Singularity = "singularity" -LiteralRuntimeEnvironments = Literal["python:3.9"] -RuntimeEnvironments = list(get_args(LiteralRuntimeEnvironments)) + class EnumRuntimeEnvironment(str, Enum, metaclass=_EnumMeta): - Python39 = "python:3.9" - PythonSingularity = "tapis/workflows-python-singularity:0.1.0" + # Basic python + PythonLatest = "python:latest", + PythonSlim = "python:slim", + Python312 = "python:3.12", + Python312Slim = "python:3.12-slim", + Python311 = "python:3.11", + Python311Slim = "python:3.11-slim", + Python10 = "python:3.10", + Python10Slim = "python:3.10-slim", + Python39 = "python:3.9", + Python39Slim = "python:3.9-slim", + Python38 = "python:3.8", + Python38Slim = "python:3.8-slim" + + # Machine Learning + TensorflowLatest = "tensorflow/tensorflow:latest", + TensorflowLatestGPU = "tensorflow/tensorflow:latest-gpu", + Tensorflow2120 = "tensorflow/tensorflow:2.12.0", + Tensorflow2120GPU = "tensorflow/tensorflow:2.12.0-gpu", + PytorchLatest = "pytorch/pytorch:latest", + HuggingfaceTranformersPytorchGPULatest = "huggingface/transformers-pytorch-gpu:latest", + HuggingfaceTranformersPytorchGPU4292 = "huggingface/transformers-pytorch-gpu:4.29.2" + + # Tapis specific # TODO Factor out into the tapis plugin + PythonSingularity = "tapis/workflows-python-singularity:0.1.0", + + # TACC specific # TODO Factor out into a new plugin for TACC + PyGeoFlood = "ghcr.io/tobiashi26/pygeoflood-container:main" + +RuntimeEnvironments = [i.value for i in EnumRuntimeEnvironment] LiteralInstallers = Literal["pip", "apt_get"] Installers = list(get_args(LiteralInstallers)) @@ -837,7 +864,9 @@ class WorkflowSubmissionRequest(BaseModel): args: Args = {} pipeline: Pipeline pipeline_run: PipelineRun + directives = {} meta: WorkflowSubmissionRequestMeta + idempotency_key: str = None class Config: extra = Extra.allow @@ -862,4 +891,4 @@ def backwards_compatibility_transforms(cls, values): # Generic object. NOTE Only used in idempotency key resolution class EmptyObject(BaseModel): - pass + pass \ No newline at end of file diff --git a/src/engine/src/Server.py b/src/engine/src/Server.py index bea1528a..b83454b0 100644 --- a/src/engine/src/Server.py +++ b/src/engine/src/Server.py @@ -1,5 +1,5 @@ -import os, sys, time, logging, json +import os, sys, time, logging from threading import Thread from typing import Literal, Union @@ -36,7 +36,7 @@ from workers import WorkerPool from workflows import WorkflowExecutor -from utils import serialize_request, load_plugins, lbuffer_str as lbuf +from utils import deserialize_message, load_plugins from errors import NoAvailableWorkers, WorkflowTerminated @@ -44,24 +44,30 @@ # TODO Keep track of workflows submissions somehow so they can be terminated later class Server: - def __init__(self): + def __init__(self, inbound_queue_name, inbound_exchange_name): self.active_workers = [] self.worker_pool = None self.plugins = [] + self.inbound_queue_name = inbound_queue_name + self.inbound_exchange_name = inbound_exchange_name def __call__(self): - """Initializes the dynamic worker pool comprised of WorkflowExecutor + """Initializes the dynamic worker pool composed of WorkflowExecutor workers, establishes a connection with RabbitMQ, creates the channel, exchanges, and queues, and begins consuming from the inbound queue""" - logger.info(f"{lbuf('[SERVER]')} Starting server") + logger.info(f"Starting server") # Initialize plugins + logger.info(f"Loading plugins {PLUGINS}") self.plugins = load_plugins(PLUGINS) # Create a worker pool that consists of the workflow executors that will # run the pipelines # TODO catch error for worker classes that dont inherit from "Worker" + logger.info(f"Initializing workers {PLUGINS}") + logger.info(f"Starting workers {STARTING_WORKERS}") + logger.info(f"Max workers {MAX_WORKERS}") self.worker_pool = WorkerPool( worker_cls=WorkflowExecutor, starting_worker_count=STARTING_WORKERS, @@ -70,7 +76,8 @@ def __call__(self): "plugins": self.plugins } ) - logger.debug(f"{lbuf('[SERVER]')} Workers initialized ({self.worker_pool.count()})") + logger.debug(f"Worker initialization complete") + logger.debug(f"Available workers ({self.worker_pool.count()})") # Connect to the message broker connection = self._connect() @@ -79,9 +86,9 @@ def __call__(self): channel = connection.channel() # Inbound exchange and queue handles workflow submissions or resubmissions - channel.exchange_declare(INBOUND_EXCHANGE, exchange_type=ExchangeType.fanout) - inbound_queue = self._declare_queue(channel, INBOUND_QUEUE, exclusive=True) - channel.queue_bind(exchange=INBOUND_EXCHANGE, queue=inbound_queue.method.queue) + channel.exchange_declare(self.inbound_exchange_name, exchange_type=ExchangeType.fanout) + inbound_queue = self._declare_queue(channel, self.inbound_queue_name, exclusive=True) + channel.queue_bind(exchange=self.inbound_exchange_name, queue=inbound_queue.method.queue) # The threads that will be started within the on_message callback threads = [] @@ -97,6 +104,8 @@ def __call__(self): ) ) + logger.debug(f"Worker Engine Server started and ready to recieve workflow submissions.") + channel.start_consuming() # Wait for all to complete @@ -104,6 +113,7 @@ def __call__(self): thread.join() connection.close() + logger.info(f"Closing connection to message broker") # Occurs when basic_consume recieves the wrong args except ValueError as e: @@ -115,96 +125,113 @@ def __call__(self): except Exception as e: logger.error(e) - def _start_worker(self, body, connection, channel, delivery_tag): - """Validates and prepares the message from the inbound exchange(and queue), - provisions a worker from the worker pool, acks the message, registers the - active worker to the server, handles the termination of duplicate - workflow submissions and starts the worker.""" - - # Prepare the execution context. The execution context contains all the - # information required to run a workflow - worker = None - acked = False # Indicates that the message as been acked + def _on_message_callback(self, channel, method, _, body, args): + ''' + 1. Deserializes and validates message from the inbound queue + 2. Provisions a worker from the worker pool + 3. Acks(or nacks) the message + 4. Registers the active worker to the server + 5. Dispatches the worker to process the worklfow submission request + ''' + + # Deserialze the message then convert to an object. If deserialization + # fails, reject the message. try: - # Decode the message body, then convert to an object. - serialized_request = serialize_request(body) - request = WorkflowSubmissionRequest(**serialized_request) - - # Get a workflow executor worker. If there are none available, - # this will raise a "NoWorkersAvailabe" error which is handled - # an the exception block below - worker = self.worker_pool.check_out() + request = WorkflowSubmissionRequest(**deserialize_message(body)) + except JSONDecodeError as e: + logger.error(e) + channel.basic_reject(method.delivery_tag, requeue=False) + return + + # Resolve the idempotency key from the request + request.idempotency_key = self._resolve_idempotency_key(request) - # Run request middlewares over the workflow context - # NOTE Request middlewares will very likely mutate the workflow context + # Run request middlewares over the workflow context + # NOTE Request middlewares will very likely mutate the request + try: for plugin in self.plugins: request = plugin.dispatch("request", request) - - # Ack the message before running the workflow executor - cb = partial(self._ack_nack, "ack", channel, delivery_tag) - connection.add_callback_threadsafe(cb) - - # Set the acked flag to True(Used to nack the message if an exception - # occurs above) - acked = True - - # Register the active worker to the server. If worker cannot - # execute, check it back in. - worker = self._register_worker(request, worker) - - threads = [] - - if worker.can_start: - worker.start(request, threads) - - for t in threads: - t.join() - - # Thrown when decoding the message body. Reject the message - except JSONDecodeError as e: + except Exception as e: logger.error(e) - channel.basic_reject(delivery_tag, requeue=False) + channel.basic_reject(method.delivery_tag, requeue=False) return + + # Get the connection to the message queue for acks and nacks + (connection, threads) = args + + # Get a workflow executor worker. If there are none available, + # this will raise a "NoWorkersAvailabe" error which is handled + # an the exception block below + try: + worker = self.worker_pool.check_out() except NoAvailableWorkers: - logger.info(f"{lbuf('[SERVER]')} Insufficient workers available. RETRYING (10s)") + logger.info(f"Insufficient workers available. RETRYING (10s)") connection.add_callback_threadsafe( partial( self._ack_nack, "nack", channel, - delivery_tag, + method.delivery_tag, delay=INSUFFICIENT_WORKER_RETRY_DELAY ) ) return - except Exception as e: - logger.error(e) - # Nack the message if it has not already been ack - # TODO Nack the message into a retry queue. - # Or reject? Why would it not be rejected? - if not acked: - cb = partial(self._ack_nack, "nack", channel, delivery_tag) - connection.add_callback_threadsafe(cb) - raise e - - # Deregister and return executor back to the worker pool - self._deregister_worker(worker) - self.worker_pool.check_in(worker) + - def _on_message_callback(self, channel, method, _, body, args): - (connection, threads) = args + # Register the active worker to the server + worker = self._register_worker(request, worker) - t = Thread( - target=self._start_worker, - args=(body, connection, channel, method.delivery_tag) + # Ack the message before dispatching the worker + connection.add_callback_threadsafe( + partial( + self._ack_nack, + "ack", + channel, + method.delivery_tag + ) ) - + + # Dispatch the worker (workflow executor) in a thread + t = Thread(target=self._dispatch, args=(worker, request)) t.start() threads.append(t) # Clean up the stopped threads threads = [t for t in threads if t.is_alive()] + def _dispatch(self, worker, request): + """Handle the starting and termination of workflows""" + + directives = request.directives.keys() + # Handle RUN directive + if "RUN" in directives: + try: + threads = [] + + if worker.can_start: + worker.start(request, threads) + + for t in threads: + t.join() + except Exception as e: + # Deregister and return executor back to the worker pool + logger.error(e) + + # Handle TERMINATE directive + if "TERMINATE_RUN" in directives: + workers = self._get_active_workers(worker.key) + for w in workers: + # Terminates all of the pipeline runs for which their are uuids + # in the TERMINATE_RUN directive array + if worker.pipeline_run_uuid in directives["TERMINATE_RUN"]: + w.terminate() + # Deregister and return executor back to the worker pool + self._deregister_worker(w) + self.worker_pool.check_in(w) + + self._deregister_worker(worker) + self.worker_pool.check_in(worker) + def _ack_nack( self, ack_nack: Union[Literal["ack"], Literal["nack"]], @@ -212,16 +239,21 @@ def _ack_nack( delivery_tag, delay=0 ): - fn = channel.basic_ack if ack_nack == "ack" else channel.basic_nack + if not channel.is_open: + raise Exception(f"Channel closed: Cannot {'negatively acknowledge' if ack_nack == 'nack' else 'acknowledge'} the message") + kwargs = {} - if ack_nack == "nack": kwargs = {"requeue": False} - if channel.is_open: - # Wait the delay if necessary - delay == 0 or time.sleep(abs(delay)) - fn(delivery_tag, **kwargs) - return + if ack_nack == "nack": + kwargs = {"requeue": False} - # TODO do something if channel is closed + # Wait the delay if necessary + delay = abs(delay) + if delay > 0: + time.sleep(delay) + + # Call the acknowledge or negative acknowlege function + fn = channel.basic_ack if ack_nack == "ack" else channel.basic_nack + fn(delivery_tag, **kwargs) def _connect(self): # Initialize connection parameters with plain credentials @@ -233,7 +265,7 @@ def _connect(self): os.environ["BROKER_USER"], os.environ["BROKER_PASSWORD"]) ) - logger.info(f"{lbuf('[SERVER]')} Connecting to message broker") + logger.info(f"Connecting to message broker") connected = False connection_attempts = 0 @@ -243,15 +275,15 @@ def _connect(self): connection = pika.BlockingConnection(connection_parameters) connected = True except Exception: - logger.info(f"{lbuf('[SERVER]')} Connection failed ({connection_attempts})") + logger.info(f"Connection failed ({connection_attempts})") time.sleep(CONNECTION_RETRY_DELAY) # Kill the build service if unable to connect if connected == False: - logger.critical(f"{lbuf('[SERVER]')} Error: Maximum connection attempts reached ({MAX_CONNECTION_ATTEMPTS}). Unable to connect to message broker.") + logger.critical(f"Error: Maximum connection attempts reached ({MAX_CONNECTION_ATTEMPTS}). Unable to connect to message broker.") sys.exit(1) - logger.info(f"{lbuf('[SERVER]')} Connected; Ready to recieve workflow submissions") + logger.info(f"Connected to message broker established") return connection @@ -260,14 +292,13 @@ def _connect(self): def _register_worker(self, request, worker): """Registers the worker to the Server. Handles duplicate workflow submissions""" - # Returns a key based on user-defined idempotency key or pipeline - # run uuid if no idempotency key is provided - worker.key = self._resolve_idempotency_key(request) - - # Set the idempotency key on the context - request.idempotency_key = worker.key + # Set the idempotency key on the worker + worker.key = request.idempotency_key + + # Set the pipeline run uuid on the worker + worker.pipeline_run_uuid = request.pipeline_run.uuid - # Check if there are workers running that have the same unique constraint key + # Check if there are workers running that have the same idempotency key active_workers = self._get_active_workers(worker.key) policy = request.pipeline.execution_profile.duplicate_submission_policy @@ -281,7 +312,7 @@ def _register_worker(self, request, worker): active_worker.terminate() self._deregister_worker(active_worker, terminated=True) elif policy == DUPLICATE_SUBMISSION_POLICY_DEFER: - logger.info(f"{lbuf('[SERVER]')} Warning: Duplicate Submission Policy of 'DEFER' not implemented. Handling as 'ALLOW'") + logger.info(f"Warning: Duplicate Submission Policy of 'DEFER' not implemented. Handling as 'ALLOW'") pass elif policy == DUPLICATE_SUBMISSION_POLICY_ALLOW: pass @@ -293,17 +324,22 @@ def _register_worker(self, request, worker): def _deregister_worker(self, worker, terminated=False): worker.key = None + worker.current_run = None self.active_workers = [ w for w in self.active_workers if w.id != worker.id ] worker.reset(terminated=terminated) - def _get_active_workers(self, key): - return [worker for worker in self.active_workers if worker.key == key] + def _get_active_workers(self, idempotency_key): + """ + Fetch all of the workers actively processing pipeline runs which have the + proivded idempotency key + """ + return [worker for worker in self.active_workers if worker.key == idempotency_key] def _declare_queue(self, channel, queue, exclusive=True): try: return channel.queue_declare(queue=queue, exclusive=exclusive) except ChannelClosedByBroker as e: - logger.critical(f"{lbuf('[SERVER]')} Exclusive queue declaration error for queue '{queue}' | {e}") + logger.critical(f"Exclusive queue declaration error for queue '{queue}' | {e}") sys.exit(1) def _resolve_idempotency_key(self, request): @@ -312,7 +348,7 @@ def _resolve_idempotency_key(self, request): # to their duplicate submission policy. # Defaults to the pipeline id - default_idempotency_key = request.pipeline.id + default_idempotency_key = request.pipeline_run.uuid if type(request.meta.idempotency_key) == str: return request.meta.idempotency_key @@ -345,9 +381,10 @@ def _resolve_idempotency_key(self, request): idempotency_key = idempotency_key + part_delimiter + str(key_part) return idempotency_key - except (AttributeError, TypeError) as e: - logger.info(f"{lbuf('[SERVER]')} Warning: Failed to resolve idempotency key from provided constraints. {str(e)}. Defaulted to pipeline id '{default_idempotency_key}'") - return default_idempotency_key - + logger.info(f"Warning: Failed to resolve idempotency key from provided constraints. {str(e)}. Defaulted to pipeline run uuid '{default_idempotency_key}'") + except Exception as e: + logger.info(f"Any unknown error occured resolving idempotency key | {str(e)}. Defaulted to pipeline run uuid '{default_idempotency_key}'") + + return default_idempotency_key diff --git a/src/engine/src/main.py b/src/engine/src/main.py index 92ce1aaf..e1473ca0 100644 --- a/src/engine/src/main.py +++ b/src/engine/src/main.py @@ -1,7 +1,11 @@ import sys, logging from Server import Server - +from utils import lbuffer_str +from conf.constants import ( + INBOUND_EXCHANGE, + INBOUND_QUEUE, +) # Set all third-party library loggers to critical for name in logging.root.manager.loggerDict: @@ -10,18 +14,14 @@ server_logger = logging.getLogger("server") handler = logging.StreamHandler(stream=sys.stdout) -handler.setFormatter(logging.Formatter("%(message)s")) +handler.setFormatter(logging.Formatter(f"{lbuffer_str('[SERVER]')} %(message)s")) server_logger.setLevel(logging.DEBUG) server_logger.addHandler(handler) -# logging.basicConfig( -# # filename=LOG_FILE, -# stream=sys.stdout, -# level=logging.DEBUG, -# # format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s", -# format="[%(asctime)s] %(message)s", -# ) if __name__ == "__main__": - server = Server() + server = Server( + INBOUND_QUEUE, + INBOUND_EXCHANGE + ) server() diff --git a/src/engine/src/owe_python_sdk/schema.py b/src/engine/src/owe_python_sdk/schema.py index a7cac090..fd44e361 100644 --- a/src/engine/src/owe_python_sdk/schema.py +++ b/src/engine/src/owe_python_sdk/schema.py @@ -54,11 +54,38 @@ class EnumImageBuilder(str, Enum, metaclass=_EnumMeta): Kaniko = "kaniko" Singularity = "singularity" -LiteralRuntimeEnvironments = Literal["python:3.9"] -RuntimeEnvironments = list(get_args(LiteralRuntimeEnvironments)) + class EnumRuntimeEnvironment(str, Enum, metaclass=_EnumMeta): - Python39 = "python:3.9" - PythonSingularity = "tapis/workflows-python-singularity:0.1.0" + # Basic python + PythonLatest = "python:latest", + PythonSlim = "python:slim", + Python312 = "python:3.12", + Python312Slim = "python:3.12-slim", + Python311 = "python:3.11", + Python311Slim = "python:3.11-slim", + Python10 = "python:3.10", + Python10Slim = "python:3.10-slim", + Python39 = "python:3.9", + Python39Slim = "python:3.9-slim", + Python38 = "python:3.8", + Python38Slim = "python:3.8-slim" + + # Machine Learning + TensorflowLatest = "tensorflow/tensorflow:latest", + TensorflowLatestGPU = "tensorflow/tensorflow:latest-gpu", + Tensorflow2120 = "tensorflow/tensorflow:2.12.0", + Tensorflow2120GPU = "tensorflow/tensorflow:2.12.0-gpu", + PytorchLatest = "pytorch/pytorch:latest", + HuggingfaceTranformersPytorchGPULatest = "huggingface/transformers-pytorch-gpu:latest", + HuggingfaceTranformersPytorchGPU4292 = "huggingface/transformers-pytorch-gpu:4.29.2" + + # Tapis specific # TODO Factor out into the tapis plugin + PythonSingularity = "tapis/workflows-python-singularity:0.1.0", + + # TACC specific # TODO Factor out into a new plugin for TACC + PyGeoFlood = "ghcr.io/tobiashi26/pygeoflood-container:main" + +RuntimeEnvironments = [i.value for i in EnumRuntimeEnvironment] LiteralInstallers = Literal["pip", "apt_get"] Installers = list(get_args(LiteralInstallers)) @@ -837,7 +864,9 @@ class WorkflowSubmissionRequest(BaseModel): args: Args = {} pipeline: Pipeline pipeline_run: PipelineRun + directives = {} meta: WorkflowSubmissionRequestMeta + idempotency_key: str = None class Config: extra = Extra.allow diff --git a/src/engine/src/utils/__init__.py b/src/engine/src/utils/__init__.py index 3dda5a1d..57d96e57 100644 --- a/src/engine/src/utils/__init__.py +++ b/src/engine/src/utils/__init__.py @@ -23,11 +23,7 @@ def lbuffer_str(string, length=10): buffer = " " * diff return string + buffer -def serialize_request(bytestring): - # DELETE THE BELOW BY: 2024/10/31 - # OLD Caused a serialization bug. But may have had use? - # value = bytestring.decode("utf8").replace("'", '"') - +def deserialize_message(bytestring): # Decode UTF-8 bytes to Unicode, and convert single quotes # to double quotes to make it valid JSON value = bytestring.decode("utf8") diff --git a/src/engine/src/workers/Worker.py b/src/engine/src/workers/Worker.py index 44093481..988f3462 100644 --- a/src/engine/src/workers/Worker.py +++ b/src/engine/src/workers/Worker.py @@ -4,7 +4,16 @@ class Worker: def __init__(self, _id=None): self.can_start = False + + # Unique identifier for this worker self.id = _id if _id != None else uuid.uuid4() + # An array of all pipeline runs handled by this worker + self.runs = [] + + # The pipeline run currently being processed by this worker + self.current_run = None + def __repr__(self): - return f"{self.__class__.__name__} id: {self.id}" \ No newline at end of file + return f"{self.__class__.__name__} id: {self.id}" + \ No newline at end of file