From 74a8163488f9340a40d401f88f496a6d9a881ea2 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 12 Aug 2020 08:11:42 -0700 Subject: [PATCH] Raw output data config object (#146) * add raw output data prefix * wip * updating * use the object now * setup * wip Co-authored-by: Haytham AbuelFutuh --- flytekit/__init__.py | 2 +- flytekit/common/launch_plan.py | 18 +++++++- flytekit/common/utils.py | 13 +++--- flytekit/common/workflow.py | 6 +++ flytekit/configuration/auth.py | 9 ++++ flytekit/models/common.py | 25 +++++++++++ flytekit/models/launch_plan.py | 36 +++++++++++----- .../unit/common_tests/test_launch_plan.py | 20 +++++++++ tests/flytekit/unit/models/test_common.py | 7 ++++ .../flytekit/unit/models/test_launch_plan.py | 41 ++++++++++++++++++- 10 files changed, 155 insertions(+), 22 deletions(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 075f4b8d48..e47b4245b7 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins -__version__ = '0.11.4' +__version__ = '0.11.5' diff --git a/flytekit/common/launch_plan.py b/flytekit/common/launch_plan.py index 9c7fdbbeef..d2b9c4bab4 100644 --- a/flytekit/common/launch_plan.py +++ b/flytekit/common/launch_plan.py @@ -53,6 +53,7 @@ def promote_from_model(cls, model): labels=model.labels, annotations=model.annotations, auth_role=model.auth_role, + raw_output_data_config=model.raw_output_data_config, ) @classmethod @@ -106,7 +107,7 @@ def auth_role(self): :rtype: flytekit.models.common.AuthRole """ fixed_auth = super(SdkLaunchPlan, self).auth_role - if fixed_auth is not None and\ + if fixed_auth is not None and \ (fixed_auth.assumable_iam_role is not None or fixed_auth.kubernetes_service_account is not None): return fixed_auth @@ -142,6 +143,18 @@ def entity_type_text(self): """ return "Launch Plan" + @property + def raw_output_data_config(self): + """ + :rtype: flytekit.models.common.RawOutputDataConfig + """ + raw_output_data_config = super(SdkLaunchPlan, self).raw_output_data_config + if raw_output_data_config is not None and raw_output_data_config.output_location_prefix != '': + return raw_output_data_config + + # If it was not set explicitly then let's use the value found in the configuration. + return _common_models.RawOutputDataConfig(_auth_config.RAW_OUTPUT_DATA_PREFIX.get()) + @_exception_scopes.system_entry_point def validate(self): # TODO: Validate workflow is satisfied @@ -269,6 +282,7 @@ def __init__( labels=None, annotations=None, auth_role=None, + raw_output_data_config=None, ): """ :param flytekit.common.workflow.SdkWorkflow sdk_workflow: @@ -284,6 +298,7 @@ def __init__( executed by this launch plan. Any custom kubernetes annotations to apply to workflows executed by this launch plan. :param flytekit.models.common.Authrole auth_role: The auth method with which to execute the workflow. + :param flytekit.models.common.RawOutputDataConfig raw_output_data_config: Config for offloading data """ if role and auth_role: raise ValueError("Cannot set both role and auth. Role is deprecated, use auth instead.") @@ -317,6 +332,7 @@ def __init__( labels or _common_models.Labels({}), annotations or _common_models.Annotations({}), auth_role, + raw_output_data_config or _common_models.RawOutputDataConfig(''), ) self._interface = _interface.TypedInterface( {k: v.var for k, v in _six.iteritems(default_inputs)}, diff --git a/flytekit/common/utils.py b/flytekit/common/utils.py index 7eaaa05b5e..3d1dfb46ea 100644 --- a/flytekit/common/utils.py +++ b/flytekit/common/utils.py @@ -11,10 +11,7 @@ from flytekit.configuration import sdk as _sdk_config from flytekit.models.core import identifier as _identifier -try: - from pathlib import Path -except ImportError: - from pathlib2 import Path # python 2 backport +from pathlib import Path def _dnsify(value): @@ -156,12 +153,12 @@ def __init__(self, context_statement): def __enter__(self): _logging.info("Entering timed context: {}".format(self._context_statement)) - self._start_wall_time = _time.time() - self._start_process_time = _time.clock() + self._start_wall_time = _time.perf_counter() + self._start_process_time = _time.process_time() def __exit__(self, exc_type, exc_val, exc_tb): - end_wall_time = _time.time() - end_process_time = _time.clock() + end_wall_time = _time.perf_counter() + end_process_time = _time.process_time() _logging.info("Exiting timed context: {} [Wall Time: {}s, Process Time: {}s]".format( self._context_statement, end_wall_time - self._start_wall_time, diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index cbf1368945..aca0a1486f 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -316,6 +316,7 @@ def create_launch_plan( annotations=None, assumable_iam_role=None, kubernetes_service_account=None, + raw_output_data_prefix=None, cls=None ): """ @@ -332,6 +333,8 @@ def create_launch_plan( class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan. :param Text assumable_iam_role: The IAM role to execute the workflow with. :param Text kubernetes_service_account: The kubernetes service account to execute the workflow with. + :param Text raw_output_data_prefix: Bucket for offloaded data + :rtype: flytekit.common.launch_plan.SdkRunnableLaunchPlan """ # TODO: Actually ensure the parameters conform. @@ -346,6 +349,8 @@ class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan auth_role = _common_models.AuthRole(assumable_iam_role=assumable_iam_role, kubernetes_service_account=kubernetes_service_account) + raw_output_config = _common_models.RawOutputDataConfig(raw_output_data_prefix or "") + return (cls or _launch_plan.SdkRunnableLaunchPlan)( sdk_workflow=self, default_inputs={ @@ -358,6 +363,7 @@ class provided should be a subclass of flytekit.common.launch_plan.SdkLaunchPlan labels=labels, annotations=annotations, auth_role=auth_role, + raw_output_data_config=raw_output_config, ) @_exception_scopes.system_entry_point diff --git a/flytekit/configuration/auth.py b/flytekit/configuration/auth.py index 82ba962f3a..407fe88165 100644 --- a/flytekit/configuration/auth.py +++ b/flytekit/configuration/auth.py @@ -13,3 +13,12 @@ """ This is the kubernetes service account that will be passed to workflow executions. """ + +RAW_OUTPUT_DATA_PREFIX = _config_common.FlyteStringConfigurationEntry('auth', 'raw_output_data_prefix', default='') +""" +This is not output metadata but rather where users can specify an S3 or gcs path for offloaded data like blobs +and schemas. + +The reason this setting is in this file is because it's inextricably tied to a workflow's role or service account, +since that is what ultimately gives the tasks the ability to write to certain buckets. +""" diff --git a/flytekit/models/common.py b/flytekit/models/common.py index db0c41416f..8d67ce4b56 100644 --- a/flytekit/models/common.py +++ b/flytekit/models/common.py @@ -474,3 +474,28 @@ def from_flyte_idl(cls, pb2_object): ) +class RawOutputDataConfig(FlyteIdlEntity): + + def __init__(self, output_location_prefix): + """ + :param Text output_location_prefix: Location of offloaded data for things like S3, etc. + """ + self._output_location_prefix = output_location_prefix + + @property + def output_location_prefix(self): + return self._output_location_prefix + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.common_pb2.Auth + """ + return _common_pb2.RawOutputDataConfig( + output_location_prefix=self.output_location_prefix + ) + + @classmethod + def from_flyte_idl(cls, pb2): + return cls( + output_location_prefix=pb2.output_location_prefix + ) diff --git a/flytekit/models/launch_plan.py b/flytekit/models/launch_plan.py index 341d37a9b8..b814d1aa25 100644 --- a/flytekit/models/launch_plan.py +++ b/flytekit/models/launch_plan.py @@ -109,7 +109,8 @@ def from_flyte_idl(cls, pb2_object): class LaunchPlanSpec(_common.FlyteIdlEntity): - def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, labels, annotations, auth_role): + def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, labels, annotations, auth_role, + raw_output_data_config): """ The spec for a Launch Plan. @@ -122,6 +123,8 @@ def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, l :param flyteidl.admin.common_pb2.Annotations annotations: Any custom kubernetes annotations to apply to workflows executed by this launch plan. :param flytekit.models.common.Auth auth_role: The auth method with which to execute the workflow. + :param flytekit.models.common.RawOutputDataConfig raw_output_data_config: Value for where to store offloaded + data like Blobs and Schemas. """ self._workflow_id = workflow_id self._entity_metadata = entity_metadata @@ -130,6 +133,7 @@ def __init__(self, workflow_id, entity_metadata, default_inputs, fixed_inputs, l self._labels = labels self._annotations = annotations self._auth_role = auth_role + self._raw_output_data_config = raw_output_data_config @property def workflow_id(self): @@ -182,10 +186,18 @@ def annotations(self): def auth_role(self): """ The authorization method with which to execute the workflow. - :return: flytekit.models.common.Auth + :rtype: flytekit.models.common.Auth """ return self._auth_role + @property + def raw_output_data_config(self): + """ + Where to store offloaded data like Blobs and Schemas + :rtype: flytekit.models.common.RawOutputDataConfig + """ + return self._raw_output_data_config + def to_flyte_idl(self): """ :rtype: flyteidl.admin.launch_plan_pb2.LaunchPlanSpec @@ -198,22 +210,24 @@ def to_flyte_idl(self): labels=self.labels.to_flyte_idl(), annotations=self.annotations.to_flyte_idl(), auth_role=self.auth_role.to_flyte_idl(), + raw_output_data_config=self.raw_output_data_config.to_flyte_idl(), ) @classmethod - def from_flyte_idl(cls, pb2_object): + def from_flyte_idl(cls, pb2): """ - :param flyteidl.admin.launch_plan_pb2.LaunchPlanSpec pb2_object: + :param flyteidl.admin.launch_plan_pb2.LaunchPlanSpec pb2: :rtype: LaunchPlanSpec """ return cls( - workflow_id=_identifier.Identifier.from_flyte_idl(pb2_object.workflow_id), - entity_metadata=LaunchPlanMetadata.from_flyte_idl(pb2_object.entity_metadata), - default_inputs=_interface.ParameterMap.from_flyte_idl(pb2_object.default_inputs), - fixed_inputs=_literals.LiteralMap.from_flyte_idl(pb2_object.fixed_inputs), - labels=_common.Labels.from_flyte_idl(pb2_object.labels), - annotations=_common.Annotations.from_flyte_idl(pb2_object.annotations), - auth_role=_common.AuthRole.from_flyte_idl(pb2_object.auth_role), + workflow_id=_identifier.Identifier.from_flyte_idl(pb2.workflow_id), + entity_metadata=LaunchPlanMetadata.from_flyte_idl(pb2.entity_metadata), + default_inputs=_interface.ParameterMap.from_flyte_idl(pb2.default_inputs), + fixed_inputs=_literals.LiteralMap.from_flyte_idl(pb2.fixed_inputs), + labels=_common.Labels.from_flyte_idl(pb2.labels), + annotations=_common.Annotations.from_flyte_idl(pb2.annotations), + auth_role=_common.AuthRole.from_flyte_idl(pb2.auth_role), + raw_output_data_config=_common.RawOutputDataConfig.from_flyte_idl(pb2.raw_output_data_config), ) diff --git a/tests/flytekit/unit/common_tests/test_launch_plan.py b/tests/flytekit/unit/common_tests/test_launch_plan.py index 8cc83daa83..8b85b74e78 100644 --- a/tests/flytekit/unit/common_tests/test_launch_plan.py +++ b/tests/flytekit/unit/common_tests/test_launch_plan.py @@ -341,3 +341,23 @@ def test_promote_from_model(): assert not isinstance(lp_from_spec, _launch_plan.SdkRunnableLaunchPlan) assert isinstance(lp_from_spec, _launch_plan.SdkLaunchPlan) assert lp_from_spec == lp + + +def test_raw_data_output_prefix(): + workflow_to_test = _workflow.workflow( + {}, + inputs={ + 'required_input': _workflow.Input(_types.Types.Integer), + 'default_input': _workflow.Input(_types.Types.Integer, default=5) + } + ) + lp = workflow_to_test.create_launch_plan( + fixed_inputs={'required_input': 5}, + raw_output_data_prefix='s3://bucket-name', + ) + assert lp.raw_output_data_config.output_location_prefix == 's3://bucket-name' + + lp2 = workflow_to_test.create_launch_plan( + fixed_inputs={'required_input': 5}, + ) + assert lp2.raw_output_data_config.output_location_prefix == '' diff --git a/tests/flytekit/unit/models/test_common.py b/tests/flytekit/unit/models/test_common.py index bab0cb0124..569b7a251a 100644 --- a/tests/flytekit/unit/models/test_common.py +++ b/tests/flytekit/unit/models/test_common.py @@ -80,3 +80,10 @@ def test_auth_role(): assert not obj.assumable_iam_role obj2 = _common.AuthRole.from_flyte_idl(obj.to_flyte_idl()) assert obj == obj2 + + +def test_raw_output_data_config(): + obj = _common.RawOutputDataConfig('s3://bucket') + assert obj.output_location_prefix == 's3://bucket' + obj2 = _common.RawOutputDataConfig.from_flyte_idl(obj.to_flyte_idl()) + assert obj2 == obj diff --git a/tests/flytekit/unit/models/test_launch_plan.py b/tests/flytekit/unit/models/test_launch_plan.py index c1683440ca..5ce2ef1999 100644 --- a/tests/flytekit/unit/models/test_launch_plan.py +++ b/tests/flytekit/unit/models/test_launch_plan.py @@ -1,6 +1,7 @@ from __future__ import absolute_import -from flytekit.models import launch_plan, schedule, interface, types +from flytekit.models import launch_plan, schedule, interface, types, literals, common +from flytekit.models.core import identifier def test_metadata(): @@ -33,3 +34,41 @@ def test_lp_closure(): assert obj == obj2 assert obj2.expected_inputs == parameter_map assert obj2.expected_outputs == variable_map + + +def test_launch_plan_spec(): + identifier_model = identifier.Identifier(identifier.ResourceType.TASK, "project", "domain", "name", "version") + + s = schedule.Schedule('asdf', '1 3 4 5 6 7') + launch_plan_metadata_model = launch_plan.LaunchPlanMetadata(schedule=s, notifications=[]) + + v = interface.Variable(types.LiteralType(simple=types.SimpleType.BOOLEAN), 'asdf asdf asdf') + p = interface.Parameter(var=v) + parameter_map = interface.ParameterMap({'ppp': p}) + + fixed_inputs = literals.LiteralMap( + { + 'a': literals.Literal(scalar=literals.Scalar(primitive=literals.Primitive(integer=1))) + } + ) + + labels_model = common.Labels({}) + annotations_model = common.Annotations({"my": "annotation"}) + + auth_role_model = common.AuthRole(assumable_iam_role='my:iam:role') + raw_data_output_config = common.RawOutputDataConfig('s3://bucket') + empty_raw_data_output_config = common.RawOutputDataConfig('') + + lp_spec_raw_output_prefixed = launch_plan.LaunchPlanSpec(identifier_model, launch_plan_metadata_model, + parameter_map, fixed_inputs, labels_model, + annotations_model, auth_role_model, raw_data_output_config) + + obj2 = launch_plan.LaunchPlanSpec.from_flyte_idl(lp_spec_raw_output_prefixed.to_flyte_idl()) + assert obj2 == lp_spec_raw_output_prefixed + + lp_spec_no_prefix = launch_plan.LaunchPlanSpec(identifier_model, launch_plan_metadata_model, + parameter_map, fixed_inputs, labels_model, + annotations_model, auth_role_model, empty_raw_data_output_config) + + obj2 = launch_plan.LaunchPlanSpec.from_flyte_idl(lp_spec_no_prefix.to_flyte_idl()) + assert obj2 == lp_spec_no_prefix