diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 9a83bd785b..749bc56556 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins -__version__ = '0.8.0b1' +__version__ = '0.8.0b2' diff --git a/flytekit/common/mixins/artifact.py b/flytekit/common/mixins/artifact.py index b97dc0a07f..aeb9075b91 100644 --- a/flytekit/common/mixins/artifact.py +++ b/flytekit/common/mixins/artifact.py @@ -51,6 +51,14 @@ def sync(self): """ pass + @_abc.abstractmethod + def _sync_closure(self): + """ + Syncs the closure of the underlying execution artifact with the state observed by the platform. + :rtype: None + """ + pass + def wait_for_completion(self, timeout=None, poll_interval=None): """ :param datetime.timedelta timeout: Amount of time to wait until the execution has completed before timing @@ -64,10 +72,11 @@ def wait_for_completion(self, timeout=None, poll_interval=None): else: time_to_give_up = _datetime.datetime.utcnow() + timeout - self.sync() + self._sync_closure() while _datetime.datetime.utcnow() < time_to_give_up: if self.is_complete: + self.sync() return _time.sleep(poll_interval.total_seconds()) - self.sync() + self._sync_closure() raise _user_exceptions.FlyteTimeout("Execution {} did not complete before timeout.".format(self)) diff --git a/flytekit/common/nodes.py b/flytekit/common/nodes.py index d743be9963..37b6b23c91 100644 --- a/flytekit/common/nodes.py +++ b/flytekit/common/nodes.py @@ -414,3 +414,11 @@ def sync(self): _task_executions.SdkTaskExecution.promote_from_model(te) for te in ne.get_task_executions() ] # TODO: Sub-workflows too once implemented + + def _sync_closure(self): + """ + Syncs the closure of the underlying execution artifact with the state observed by the platform. + :rtype: None + """ + ne = _engine_loader.get_engine().get_node_execution(self) + ne.sync() diff --git a/flytekit/common/tasks/executions.py b/flytekit/common/tasks/executions.py index 4931a9b4d5..f77728a3c4 100644 --- a/flytekit/common/tasks/executions.py +++ b/flytekit/common/tasks/executions.py @@ -106,4 +106,11 @@ def promote_from_model(cls, base_model): ) def sync(self): + self._sync_closure() + + def _sync_closure(self): + """ + Syncs the closure of the underlying execution artifact with the state observed by the platform. + :rtype: None + """ _engine_loader.get_engine().get_task_execution(self).sync() diff --git a/flytekit/common/workflow_execution.py b/flytekit/common/workflow_execution.py index 9d1f54d929..09298fae12 100644 --- a/flytekit/common/workflow_execution.py +++ b/flytekit/common/workflow_execution.py @@ -123,9 +123,17 @@ def sync(self): :rtype: None """ if not self.is_complete or self._node_executions is None: - _engine_loader.get_engine().get_workflow_execution(self).sync() + self._sync_closure() self._node_executions = self.get_node_executions() + def _sync_closure(self): + """ + Syncs the closure of the underlying execution artifact with the state observed by the platform. + :rtype: None + """ + if not self.is_complete: + _engine_loader.get_engine().get_workflow_execution(self).sync() + def get_node_executions(self, filters=None): """ :param list[flytekit.models.filters.Filter] filters: diff --git a/flytekit/plugins/__init__.py b/flytekit/plugins/__init__.py index 0f338c62b4..56c2da7cc4 100644 --- a/flytekit/plugins/__init__.py +++ b/flytekit/plugins/__init__.py @@ -25,7 +25,7 @@ _lazy_loader.LazyLoadPlugin( "sidecar", - ["k8s-proto>=0.0.2,<1.0.0"], + ["k8s-proto>=0.0.3,<1.0.0"], [k8s, flyteidl] )