Skip to content

Commit

Permalink
Raw output data config object (#146)
Browse files Browse the repository at this point in the history
* add raw output data prefix

* wip

* updating

* use the object now

* setup

* wip

Co-authored-by: Haytham AbuelFutuh <[email protected]>
  • Loading branch information
wild-endeavor and EngHabu authored Aug 12, 2020
1 parent 8d557a0 commit 74a8163
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 22 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.11.4'
__version__ = '0.11.5'
18 changes: 17 additions & 1 deletion flytekit/common/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def promote_from_model(cls, model):
labels=model.labels,
annotations=model.annotations,
auth_role=model.auth_role,
raw_output_data_config=model.raw_output_data_config,
)

@classmethod
Expand Down Expand Up @@ -106,7 +107,7 @@ def auth_role(self):
:rtype: flytekit.models.common.AuthRole
"""
fixed_auth = super(SdkLaunchPlan, self).auth_role
if fixed_auth is not None and\
if fixed_auth is not None and \
(fixed_auth.assumable_iam_role is not None or fixed_auth.kubernetes_service_account is not None):
return fixed_auth

Expand Down Expand Up @@ -142,6 +143,18 @@ def entity_type_text(self):
"""
return "Launch Plan"

@property
def raw_output_data_config(self):
"""
:rtype: flytekit.models.common.RawOutputDataConfig
"""
raw_output_data_config = super(SdkLaunchPlan, self).raw_output_data_config
if raw_output_data_config is not None and raw_output_data_config.output_location_prefix != '':
return raw_output_data_config

# If it was not set explicitly then let's use the value found in the configuration.
return _common_models.RawOutputDataConfig(_auth_config.RAW_OUTPUT_DATA_PREFIX.get())

@_exception_scopes.system_entry_point
def validate(self):
# TODO: Validate workflow is satisfied
Expand Down Expand Up @@ -269,6 +282,7 @@ def __init__(
labels=None,
annotations=None,
auth_role=None,
raw_output_data_config=None,
):
"""
:param flytekit.common.workflow.SdkWorkflow sdk_workflow:
Expand All @@ -284,6 +298,7 @@ def __init__(
executed by this launch plan.
Any custom kubernetes annotations to apply to workflows executed by this launch plan.
:param flytekit.models.common.Authrole auth_role: The auth method with which to execute the workflow.
:param flytekit.models.common.RawOutputDataConfig raw_output_data_config: Config for offloading data
"""
if role and auth_role:
raise ValueError("Cannot set both role and auth. Role is deprecated, use auth instead.")
Expand Down Expand Up @@ -317,6 +332,7 @@ def __init__(
labels or _common_models.Labels({}),
annotations or _common_models.Annotations({}),
auth_role,
raw_output_data_config or _common_models.RawOutputDataConfig(''),
)
self._interface = _interface.TypedInterface(
{k: v.var for k, v in _six.iteritems(default_inputs)},
Expand Down
13 changes: 5 additions & 8 deletions flytekit/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
from flytekit.configuration import sdk as _sdk_config
from flytekit.models.core import identifier as _identifier

try:
from pathlib import Path
except ImportError:
from pathlib2 import Path # python 2 backport
from pathlib import Path


def _dnsify(value):
Expand Down Expand Up @@ -156,12 +153,12 @@ def __init__(self, context_statement):

def __enter__(self):
_logging.info("Entering timed context: {}".format(self._context_statement))
self._start_wall_time = _time.time()
self._start_process_time = _time.clock()
self._start_wall_time = _time.perf_counter()
self._start_process_time = _time.process_time()

def __exit__(self, exc_type, exc_val, exc_tb):
end_wall_time = _time.time()
end_process_time = _time.clock()
end_wall_time = _time.perf_counter()
end_process_time = _time.process_time()
_logging.info("Exiting timed context: {} [Wall Time: {}s, Process Time: {}s]".format(
self._context_statement,
end_wall_time - self._start_wall_time,
Expand Down
6 changes: 6 additions & 0 deletions flytekit/common/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def create_launch_plan(
annotations=None,
assumable_iam_role=None,
kubernetes_service_account=None,
raw_output_data_prefix=None,
cls=None
):
"""
Expand All @@ -332,6 +333,8 @@ def create_launch_plan(
class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan.
:param Text assumable_iam_role: The IAM role to execute the workflow with.
:param Text kubernetes_service_account: The kubernetes service account to execute the workflow with.
:param Text raw_output_data_prefix: Bucket for offloaded data
:rtype: flytekit.common.launch_plan.SdkRunnableLaunchPlan
"""
# TODO: Actually ensure the parameters conform.
Expand All @@ -346,6 +349,8 @@ class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan
auth_role = _common_models.AuthRole(assumable_iam_role=assumable_iam_role,
kubernetes_service_account=kubernetes_service_account)

raw_output_config = _common_models.RawOutputDataConfig(raw_output_data_prefix or "")

return (cls or _launch_plan.SdkRunnableLaunchPlan)(
sdk_workflow=self,
default_inputs={
Expand All @@ -358,6 +363,7 @@ class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan
labels=labels,
annotations=annotations,
auth_role=auth_role,
raw_output_data_config=raw_output_config,
)

@_exception_scopes.system_entry_point
Expand Down
9 changes: 9 additions & 0 deletions flytekit/configuration/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@
"""
This is the kubernetes service account that will be passed to workflow executions.
"""

RAW_OUTPUT_DATA_PREFIX = _config_common.FlyteStringConfigurationEntry('auth', 'raw_output_data_prefix', default='')
"""
This is not output metadata but rather where users can specify an S3 or gcs path for offloaded data like blobs
and schemas.
The reason this setting is in this file is because it's inextricably tied to a workflow's role or service account,
since that is what ultimately gives the tasks the ability to write to certain buckets.
"""
25 changes: 25 additions & 0 deletions flytekit/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,28 @@ def from_flyte_idl(cls, pb2_object):
)


class RawOutputDataConfig(FlyteIdlEntity):

def __init__(self, output_location_prefix):
"""
:param Text output_location_prefix: Location of offloaded data for things like S3, etc.
"""
self._output_location_prefix = output_location_prefix

@property
def output_location_prefix(self):
return self._output_location_prefix

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.common_pb2.Auth
"""
return _common_pb2.RawOutputDataConfig(
output_location_prefix=self.output_location_prefix
)

@classmethod
def from_flyte_idl(cls, pb2):
return cls(
output_location_prefix=pb2.output_location_prefix
)
36 changes: 25 additions & 11 deletions flytekit/models/launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def from_flyte_idl(cls, pb2_object):

class LaunchPlanSpec(_common.FlyteIdlEntity):

def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, labels, annotations, auth_role):
def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, labels, annotations, auth_role,
raw_output_data_config):
"""
The spec for a Launch Plan.
Expand All @@ -122,6 +123,8 @@ def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, l
:param flyteidl.admin.common_pb2.Annotations annotations:
Any custom kubernetes annotations to apply to workflows executed by this launch plan.
:param flytekit.models.common.Auth auth_role: The auth method with which to execute the workflow.
:param flytekit.models.common.RawOutputDataConfig raw_output_data_config: Value for where to store offloaded
data like Blobs and Schemas.
"""
self._workflow_id = workflow_id
self._entity_metadata = entity_metadata
Expand All @@ -130,6 +133,7 @@ def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, l
self._labels = labels
self._annotations = annotations
self._auth_role = auth_role
self._raw_output_data_config = raw_output_data_config

@property
def workflow_id(self):
Expand Down Expand Up @@ -182,10 +186,18 @@ def annotations(self):
def auth_role(self):
"""
The authorization method with which to execute the workflow.
:return: flytekit.models.common.Auth
:rtype: flytekit.models.common.Auth
"""
return self._auth_role

@property
def raw_output_data_config(self):
"""
Where to store offloaded data like Blobs and Schemas
:rtype: flytekit.models.common.RawOutputDataConfig
"""
return self._raw_output_data_config

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.launch_plan_pb2.LaunchPlanSpec
Expand All @@ -198,22 +210,24 @@ def to_flyte_idl(self):
labels=self.labels.to_flyte_idl(),
annotations=self.annotations.to_flyte_idl(),
auth_role=self.auth_role.to_flyte_idl(),
raw_output_data_config=self.raw_output_data_config.to_flyte_idl(),
)

@classmethod
def from_flyte_idl(cls, pb2_object):
def from_flyte_idl(cls, pb2):
"""
:param flyteidl.admin.launch_plan_pb2.LaunchPlanSpec pb2_object:
:param flyteidl.admin.launch_plan_pb2.LaunchPlanSpec pb2:
:rtype: LaunchPlanSpec
"""
return cls(
workflow_id=_identifier.Identifier.from_flyte_idl(pb2_object.workflow_id),
entity_metadata=LaunchPlanMetadata.from_flyte_idl(pb2_object.entity_metadata),
default_inputs=_interface.ParameterMap.from_flyte_idl(pb2_object.default_inputs),
fixed_inputs=_literals.LiteralMap.from_flyte_idl(pb2_object.fixed_inputs),
labels=_common.Labels.from_flyte_idl(pb2_object.labels),
annotations=_common.Annotations.from_flyte_idl(pb2_object.annotations),
auth_role=_common.AuthRole.from_flyte_idl(pb2_object.auth_role),
workflow_id=_identifier.Identifier.from_flyte_idl(pb2.workflow_id),
entity_metadata=LaunchPlanMetadata.from_flyte_idl(pb2.entity_metadata),
default_inputs=_interface.ParameterMap.from_flyte_idl(pb2.default_inputs),
fixed_inputs=_literals.LiteralMap.from_flyte_idl(pb2.fixed_inputs),
labels=_common.Labels.from_flyte_idl(pb2.labels),
annotations=_common.Annotations.from_flyte_idl(pb2.annotations),
auth_role=_common.AuthRole.from_flyte_idl(pb2.auth_role),
raw_output_data_config=_common.RawOutputDataConfig.from_flyte_idl(pb2.raw_output_data_config),
)


Expand Down
20 changes: 20 additions & 0 deletions tests/flytekit/unit/common_tests/test_launch_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,23 @@ def test_promote_from_model():
assert not isinstance(lp_from_spec, _launch_plan.SdkRunnableLaunchPlan)
assert isinstance(lp_from_spec, _launch_plan.SdkLaunchPlan)
assert lp_from_spec == lp


def test_raw_data_output_prefix():
workflow_to_test = _workflow.workflow(
{},
inputs={
'required_input': _workflow.Input(_types.Types.Integer),
'default_input': _workflow.Input(_types.Types.Integer, default=5)
}
)
lp = workflow_to_test.create_launch_plan(
fixed_inputs={'required_input': 5},
raw_output_data_prefix='s3://bucket-name',
)
assert lp.raw_output_data_config.output_location_prefix == 's3://bucket-name'

lp2 = workflow_to_test.create_launch_plan(
fixed_inputs={'required_input': 5},
)
assert lp2.raw_output_data_config.output_location_prefix == ''
7 changes: 7 additions & 0 deletions tests/flytekit/unit/models/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,10 @@ def test_auth_role():
assert not obj.assumable_iam_role
obj2 = _common.AuthRole.from_flyte_idl(obj.to_flyte_idl())
assert obj == obj2


def test_raw_output_data_config():
obj = _common.RawOutputDataConfig('s3://bucket')
assert obj.output_location_prefix == 's3://bucket'
obj2 = _common.RawOutputDataConfig.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
41 changes: 40 additions & 1 deletion tests/flytekit/unit/models/test_launch_plan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import

from flytekit.models import launch_plan, schedule, interface, types
from flytekit.models import launch_plan, schedule, interface, types, literals, common
from flytekit.models.core import identifier


def test_metadata():
Expand Down Expand Up @@ -33,3 +34,41 @@ def test_lp_closure():
assert obj == obj2
assert obj2.expected_inputs == parameter_map
assert obj2.expected_outputs == variable_map


def test_launch_plan_spec():
identifier_model = identifier.Identifier(identifier.ResourceType.TASK, "project", "domain", "name", "version")

s = schedule.Schedule('asdf', '1 3 4 5 6 7')
launch_plan_metadata_model = launch_plan.LaunchPlanMetadata(schedule=s, notifications=[])

v = interface.Variable(types.LiteralType(simple=types.SimpleType.BOOLEAN), 'asdf asdf asdf')
p = interface.Parameter(var=v)
parameter_map = interface.ParameterMap({'ppp': p})

fixed_inputs = literals.LiteralMap(
{
'a': literals.Literal(scalar=literals.Scalar(primitive=literals.Primitive(integer=1)))
}
)

labels_model = common.Labels({})
annotations_model = common.Annotations({"my": "annotation"})

auth_role_model = common.AuthRole(assumable_iam_role='my:iam:role')
raw_data_output_config = common.RawOutputDataConfig('s3://bucket')
empty_raw_data_output_config = common.RawOutputDataConfig('')

lp_spec_raw_output_prefixed = launch_plan.LaunchPlanSpec(identifier_model, launch_plan_metadata_model,
parameter_map, fixed_inputs, labels_model,
annotations_model, auth_role_model, raw_data_output_config)

obj2 = launch_plan.LaunchPlanSpec.from_flyte_idl(lp_spec_raw_output_prefixed.to_flyte_idl())
assert obj2 == lp_spec_raw_output_prefixed

lp_spec_no_prefix = launch_plan.LaunchPlanSpec(identifier_model, launch_plan_metadata_model,
parameter_map, fixed_inputs, labels_model,
annotations_model, auth_role_model, empty_raw_data_output_config)

obj2 = launch_plan.LaunchPlanSpec.from_flyte_idl(lp_spec_no_prefix.to_flyte_idl())
assert obj2 == lp_spec_no_prefix

0 comments on commit 74a8163

Please sign in to comment.