Skip to content

Commit

Permalink
Improve task processing time and query accuracy
Browse files Browse the repository at this point in the history
Give the task manager a hint when new tasks are added and force a task
harvest prior to a task's status query to improve responsiveness of
dynamic workflows.
  • Loading branch information
ctsa committed May 24, 2017
1 parent b5f8dee commit 8f9e1b5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
1 change: 1 addition & 0 deletions pyflow/doc/ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
v1.1.16 20170523
* Improve task processing time and query accuracy
* Fix task status query methods to work correctly within subworkflows
v1.1.15 20170519
* Add new launchTasksUntil demo
Expand Down
14 changes: 11 additions & 3 deletions pyflow/src/pyflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3853,7 +3853,7 @@ def _isWorkflowStopped() :
return WorkflowRunner._allStop.isSet()

def _addTaskCore(self, namespace, label, payload, dependencies) :
# private core taskAdd routine for hijacking
# private core addTask routine for hijacking
# fromWorkflow is the workflow instance used to launch the task
#

Expand Down Expand Up @@ -3942,6 +3942,11 @@ def _isTaskCompleteCore(self, namespace, taskLabel) :

if not self._tdag.isTaskPresent(namespace, taskLabel) :
return (False, False)

# Run a task harvest just before checking the task status
# to help ensure the status is up to date
self._tman.harvestTasks()

task = self._tdag.getTask(namespace, taskLabel)
return ( task.isDone(), task.isError() )

Expand All @@ -3963,9 +3968,12 @@ def _checkTaskLabel(label) :


def _startTaskManager(self) :
# start a new task manager if one isn't already running:
# Start a new task manager if one isn't already running. If it is running
# provide a hint that a new task has just been added to the workflow.
#
if (self._tman is not None) and (self._tman.isAlive()) : return
if (self._tman is not None) and (self._tman.isAlive()) :
self._tdag.isFinishedEvent.set()
return
if not self._cdata().isTaskManagerException :
self._tman = TaskManager(self._cdata(), self._tdag)
self._tman.start()
Expand Down
4 changes: 1 addition & 3 deletions scratch/test/pyflow_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,7 @@ def __init__(self) :
class SelfWorkflow2(WorkflowRunner) :
def workflow(self2) :
self2.addTask("A2",getSleepCmd()+["0"])

# TODO Find a more robust way to ensure that A2 'should' be complete by the time a query is made below
time.sleep(102)
time.sleep(1)
self.taskStatus2 = self2.isTaskComplete("A2")
(self.taskStatus3, _) = self2.isTaskDone("A2")

Expand Down

0 comments on commit 8f9e1b5

Please sign in to comment.