Skip to content

Commit

Permalink
Add _sync_closure call to reduce load from wait_for_completion calls (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored May 18, 2020
1 parent 597a7b6 commit 1926b12
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 5 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.8.0b1'
__version__ = '0.8.0b2'
13 changes: 11 additions & 2 deletions flytekit/common/mixins/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ def sync(self):
"""
pass

@_abc.abstractmethod
def _sync_closure(self):
"""
Syncs the closure of the underlying execution artifact with the state observed by the platform.
:rtype: None
"""
pass

def wait_for_completion(self, timeout=None, poll_interval=None):
"""
:param datetime.timedelta timeout: Amount of time to wait until the execution has completed before timing
Expand All @@ -64,10 +72,11 @@ def wait_for_completion(self, timeout=None, poll_interval=None):
else:
time_to_give_up = _datetime.datetime.utcnow() + timeout

self.sync()
self._sync_closure()
while _datetime.datetime.utcnow() < time_to_give_up:
if self.is_complete:
self.sync()
return
_time.sleep(poll_interval.total_seconds())
self.sync()
self._sync_closure()
raise _user_exceptions.FlyteTimeout("Execution {} did not complete before timeout.".format(self))
8 changes: 8 additions & 0 deletions flytekit/common/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,11 @@ def sync(self):
_task_executions.SdkTaskExecution.promote_from_model(te) for te in ne.get_task_executions()
]
# TODO: Sub-workflows too once implemented

def _sync_closure(self):
"""
Syncs the closure of the underlying execution artifact with the state observed by the platform.
:rtype: None
"""
ne = _engine_loader.get_engine().get_node_execution(self)
ne.sync()
7 changes: 7 additions & 0 deletions flytekit/common/tasks/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,11 @@ def promote_from_model(cls, base_model):
)

def sync(self):
self._sync_closure()

def _sync_closure(self):
"""
Syncs the closure of the underlying execution artifact with the state observed by the platform.
:rtype: None
"""
_engine_loader.get_engine().get_task_execution(self).sync()
10 changes: 9 additions & 1 deletion flytekit/common/workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,17 @@ def sync(self):
:rtype: None
"""
if not self.is_complete or self._node_executions is None:
_engine_loader.get_engine().get_workflow_execution(self).sync()
self._sync_closure()
self._node_executions = self.get_node_executions()

def _sync_closure(self):
"""
Syncs the closure of the underlying execution artifact with the state observed by the platform.
:rtype: None
"""
if not self.is_complete:
_engine_loader.get_engine().get_workflow_execution(self).sync()

def get_node_executions(self, filters=None):
"""
:param list[flytekit.models.filters.Filter] filters:
Expand Down
2 changes: 1 addition & 1 deletion flytekit/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

_lazy_loader.LazyLoadPlugin(
"sidecar",
["k8s-proto>=0.0.2,<1.0.0"],
["k8s-proto>=0.0.3,<1.0.0"],
[k8s, flyteidl]
)

Expand Down

0 comments on commit 1926b12

Please sign in to comment.