Skip to content

Commit

Permalink
Merge pull request #297 from latchbio/maximsmol/idl
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushkamat authored Oct 26, 2023
2 parents 45e4663 + b08d01e commit 989e0b3
Show file tree
Hide file tree
Showing 14 changed files with 2,903 additions and 0 deletions.
168 changes: 168 additions & 0 deletions latch/idl/admin/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import typing
from collections.abc import Iterable, Mapping
from dataclasses import dataclass

import flyteidl.admin.common_pb2 as pb

from ..core.execution import WorkflowExecution
from ..utils import merged_pb, to_idl_many


@dataclass
class EmailNotification:
"""Defines an email notification specification."""

recipients_email: Iterable[str]
"""
The list of email addresses recipients for this notification.
+required
"""

def to_idl(self) -> pb.EmailNotification:
return pb.EmailNotification(recipients_email=self.recipients_email)


@dataclass
class PagerDutyNotification:
"""Defines a pager duty notification specification."""

recipients_email: Iterable[str]
"""
Currently, PagerDuty notifications leverage email to trigger a notification.
+required
"""

def to_idl(self) -> pb.PagerDutyNotification:
return pb.PagerDutyNotification(recipients_email=self.recipients_email)


@dataclass
class SlackNotification:
"""Defines a slack notification specification."""

recipients_email: Iterable[str]
"""
Currently, Slack notifications leverage email to trigger a notification.
+required
"""

def to_idl(self) -> pb.SlackNotification:
return pb.SlackNotification(recipients_email=self.recipients_email)


@dataclass
class Notification:
"""
Represents a structure for notifications based on execution status.
The notification content is configured within flyte admin but can be templatized.
Future iterations could expose configuring notifications with custom content.
"""

phases: Iterable[WorkflowExecution.Phase]
"""
A list of phases to which users can associate the notifications to.
+required
"""

type: "typing.Union[NotificationTypeEmail, NotificationTypePagerDuty, NotificationTypeSlack]"
"""
The type of notification to trigger.
+required
"""

def to_idl(self) -> pb.Notification:
return merged_pb(pb.Notification(phases=to_idl_many(self.phases)), self.type)


@dataclass
class NotificationTypeEmail:
email: EmailNotification

def to_idl(self) -> pb.Notification:
return pb.Notification(email=self.email.to_idl())


@dataclass
class NotificationTypePagerDuty:
pager_duty: PagerDutyNotification

def to_idl(self) -> pb.Notification:
return pb.Notification(pager_duty=self.pager_duty.to_idl())


@dataclass
class NotificationTypeSlack:
slack: SlackNotification

def to_idl(self) -> pb.Notification:
return pb.Notification(slack=self.slack.to_idl())


@dataclass
class Labels:
"""
Label values to be applied to an execution resource.
In the future a mode (e.g. OVERRIDE, APPEND, etc) can be defined
to specify how to merge labels defined at registration and execution time.
"""

values: Mapping[str, str]
"""Map of custom labels to be applied to the execution resource."""

def to_idl(self) -> pb.Labels:
return pb.Labels(values=self.values)


@dataclass
class Annotations:
"""
Annotation values to be applied to an execution resource.
In the future a mode (e.g. OVERRIDE, APPEND, etc) can be defined
to specify how to merge annotations defined at registration and execution time.
"""

values: Mapping[str, str]
"""Map of custom annotations to be applied to the execution resource."""

def to_idl(self) -> pb.Annotations:
return pb.Annotations(values=self.values)


@dataclass
class AuthRole:
"""
Defines permissions associated with executions created by this launch plan spec.
Use either of these roles when they have permissions required by your workflow execution.
Deprecated.
"""

assumable_iam_role: str
"""Defines an optional iam role which will be used for tasks run in executions created with this launch plan."""

kubernetes_service_account: str
"""Defines an optional kubernetes service account which will be used for tasks run in executions created with this launch plan."""

def to_idl(self) -> pb.AuthRole:
return pb.AuthRole(
assumable_iam_role=self.assumable_iam_role,
kubernetes_service_account=self.kubernetes_service_account,
)


@dataclass
class RawOutputDataConfig:
"""
Encapsulates user settings pertaining to offloaded data (i.e. Blobs, Schema, query data, etc.).
See https://github.com/flyteorg/flyte/issues/211 for more background information.
"""

output_location_prefix: str
"""
Prefix for where offloaded data from user workflows will be written
e.g. s3://bucket/key or s3://bucket/
"""

def to_idl(self) -> pb.RawOutputDataConfig:
return pb.RawOutputDataConfig(
output_location_prefix=self.output_location_prefix
)
143 changes: 143 additions & 0 deletions latch/idl/admin/launch_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Optional

import flyteidl.admin.launch_plan_pb2 as pb
import google.protobuf.wrappers_pb2 as pb_wrap

from ..core.execution import QualityOfService
from ..core.identifier import Identifier
from ..core.interface import ParameterMap
from ..core.literals import LiteralMap
from ..core.security import SecurityContext
from ..utils import to_idl_many, try_to_idl
from .common import Annotations, AuthRole, Labels, Notification, RawOutputDataConfig
from .schedule import Schedule


@dataclass
class Auth:
"""
Defines permissions associated with executions created by this launch plan spec.
Use either of these roles when they have permissions required by your workflow execution.
Deprecated.
"""

assumable_iam_role: str
"""Defines an optional iam role which will be used for tasks run in executions created with this launch plan."""

kubernetes_service_account: str
"""Defines an optional kubernetes service account which will be used for tasks run in executions created with this launch plan."""

def to_idl(self) -> pb.Auth:
return pb.Auth(
assumable_iam_role=self.assumable_iam_role,
kubernetes_service_account=self.kubernetes_service_account,
)


@dataclass
class LaunchPlanSpec:
"""User-provided launch plan definition and configuration values."""

workflow_id: Identifier
"""Reference to the Workflow template that the launch plan references"""

entity_metadata: "LaunchPlanMetadata"
"""Metadata for the Launch Plan"""

default_inputs: ParameterMap
"""
Input values to be passed for the execution.
These can be overriden when an execution is created with this launch plan.
"""

fixed_inputs: LiteralMap
"""
Fixed, non-overridable inputs for the Launch Plan.
These can not be overriden when an execution is created with this launch plan.
"""

"""
String to indicate the role to use to execute the workflow underneath
Deprecated
"""
role: str

labels: Labels
"""Custom labels to be applied to the execution resource."""

annotations: Annotations
"""Custom annotations to be applied to the execution resource."""

security_context: SecurityContext
"""Indicates security context for permissions triggered with this launch plan"""

quality_of_service: QualityOfService
"""Indicates the runtime priority of the execution."""

raw_output_data_config: RawOutputDataConfig
"""Encapsulates user settings pertaining to offloaded data (i.e. Blobs, Schema, query data, etc.)."""

max_parallelism: int
"""
Controls the maximum number of tasknodes that can be run in parallel for the entire workflow.
This is useful to achieve fairness. Note: MapTasks are regarded as one unit,
and parallelism/concurrency of MapTasks is independent from this.
"""

interruptible: Optional[bool] = None
"""
Allows for the interruptible flag of a workflow to be overwritten for a single execution.
Omitting this field uses the workflow's value as a default.
As we need to distinguish between the field not being provided and its default value false, we have to use a wrapper
around the bool field.
"""

auth: Optional[Auth] = None
"""
Indicates the permission associated with workflow executions triggered with this launch plan.
Deprecated
"""

auth_role: Optional[AuthRole] = None

def to_idl(self) -> pb.LaunchPlanSpec:
return pb.LaunchPlanSpec(
workflow_id=self.workflow_id.to_idl(),
entity_metadata=self.entity_metadata.to_idl(),
default_inputs=self.default_inputs.to_idl(),
fixed_inputs=self.fixed_inputs.to_idl(),
role=self.role,
labels=self.labels.to_idl(),
annotations=self.annotations.to_idl(),
auth=try_to_idl(self.auth),
auth_role=try_to_idl(self.auth_role),
security_context=self.security_context.to_idl(),
quality_of_service=self.quality_of_service.to_idl(),
raw_output_data_config=self.raw_output_data_config.to_idl(),
max_parallelism=self.max_parallelism,
interruptible=pb_wrap.BoolValue(value=self.interruptible),
)


@dataclass
class LaunchPlanMetadata:
"""
Additional launch plan attributes included in the LaunchPlanSpec not strictly required to launch
the reference workflow.
"""

schedule: Schedule
"""Schedule to execute the Launch Plan"""

notifications: Iterable[Notification]
"""List of notifications based on Execution status transitions"""

def to_idl(self) -> pb.LaunchPlanMetadata:
return pb.LaunchPlanMetadata(
schedule=self.schedule.to_idl(),
notifications=to_idl_many(self.notifications),
)
Loading

0 comments on commit 989e0b3

Please sign in to comment.