Skip to content

Commit

Permalink
Vine: add wait and as_completed for futures (#3876)
Browse files Browse the repository at this point in the history
* adds wait and as_completed

* call super and add test

* fix test

* use futures not tasks

* tpyo

* raise exeception and fix recursion

---------

Co-authored-by: Barry Jay Sly-Delgado <[email protected]>
  • Loading branch information
BarrySlyDelgado and Barry Jay Sly-Delgado authored Jul 31, 2024
1 parent 06531dc commit 4b43490
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 5 deletions.
151 changes: 146 additions & 5 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
import hashlib
from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import ALL_COMPLETED
from concurrent.futures._base import PENDING
from concurrent.futures._base import CANCELLED
from concurrent.futures._base import FINISHED
from concurrent.futures import TimeoutError
from collections import namedtuple
from .task import (
PythonTask,
FunctionCall,
Expand All @@ -15,8 +23,11 @@
)

import os
import time
import textwrap

RESULT_PENDING = 'result_pending'

try:
import cloudpickle
pythontask_available = True
Expand All @@ -31,6 +42,109 @@ def retrieve_output(arg):
return arg


def wait(fs, timeout=None, return_when=ALL_COMPLETED):

results = namedtuple('result', ['done', 'not_done'])
results.done = set()
results.not_done = set()

# submit tasks if they have not been subitted
for f in fs:
if not f._is_submitted:
f.module_manager.submit(f._task)

time_init = time.time()
if timeout is None:
time_check = float('inf')
else:
time_check = timeout
done = False
while time.time() - time_init < time_check and not done:
done = True
for f in fs:

# skip if future is complete
if f in results.done:
continue

# check for completion
result = f.result(timeout=5)

# add to set of finished tasks and break when needed.
if result != RESULT_PENDING:
results.done.add(f)

# if this is the first completed task, break.
if return_when == FIRST_COMPLETED:
done = True
break

if isinstance(result, Exception) and return_when == FIRST_EXCEPTION:
done = True
break

# set done to false to finish loop.
else:
done = False

# check form timeout
if timeout is not None:
if time.time() - time_init > timeout:
break

# add incomplete futures to set
for f in fs:
if f not in results.done:
results.not_done.add(f)

return results


def as_completed(fs, timeout=None):

results = set()

# submit tasks if they have not been subitted
for f in fs:
if not f._is_submitted:
f.module_manager.submit(f._task)

time_init = time.time()
if timeout is None:
time_check = float('inf')
else:
time_check = timeout

done = False
while time.time() - time_init < time_check and not done:
for f in fs:
done = True
# skip if future is complete
if f in results:
continue

# check for completion
result = f.result(timeout=5)

# add to set of finished tasks
if result != RESULT_PENDING:
results.add(f)

# set done to false to finish loop.
else:
done = False

# check form timeout
if timeout is not None:
if time.time() - time_init > timeout:
break
for f in fs:
if f not in results:
results.add(TimeoutError)

return iter(results)


##
# \class FuturesExecutor
#
Expand Down Expand Up @@ -65,12 +179,16 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
def submit(self, fn, *args, **kwargs):
if isinstance(fn, FuturePythonTask):
self.manager.submit(fn)
fn._future._is_submitted = True
return fn._future
if isinstance(fn, FutureFunctionCall):
self.manager.submit(fn)
self.task_table.append(fn)
fn._future._is_submitted = True
return fn._future
future_task = self.future_task(fn, *args, **kwargs)
self.task_table.append(future_task)
future_task._future._is_submitted = True
self.submit(future_task)
return future_task._future

Expand Down Expand Up @@ -110,11 +228,16 @@ def __del__(self):
# An instance of this class can re resolved to a value that will be computed asynchronously
class VineFuture(Future):
def __init__(self, task):
super().__init__()
self._state = PENDING
self._task = task
self._callback_fns = []
self._result = None
self._is_submitted = False

def cancel(self):
self._task._module_manager.cancel_by_task_id(self._task.id)
self._state = CANCELLED

def cancelled(self):
state = self._task._module_manager.task_state(self._task.id)
Expand All @@ -140,7 +263,13 @@ def done(self):
def result(self, timeout="wait_forever"):
if timeout is None:
timeout = "wait_forever"
return self._task.output(timeout=timeout)
result = self._task.output(timeout=timeout)
if result == RESULT_PENDING:
return RESULT_PENDING
else:
self._result = result
self._state = FINISHED
return result

def add_done_callback(self, fn):
self._callback_fns.append(fn)
Expand Down Expand Up @@ -187,11 +316,14 @@ def output(self, timeout="wait_forever"):
if not self._is_retriever:
if self._saved_output:
return self._saved_output
self._saved_output = self._retriever.output(timeout=timeout)
result = self._retriever.output(timeout=timeout)
if result is RESULT_PENDING:
return RESULT_PENDING
self._saved_output = result
if not self._ran_functions:
self._ran_functions = True
for fn in self._future._callback_fns:
fn(self._future)
self._ran_functions = True
return self._saved_output

# for retriever task: fetch the result of its retrievee on completion
Expand All @@ -200,6 +332,8 @@ def output(self, timeout="wait_forever"):
result = self._manager.wait_for_task_id(self.id, timeout=timeout)
if result:
self._has_retrieved = True
else:
return RESULT_PENDING
if not self._saved_output and self._has_retrieved:
if self.successful():
try:
Expand All @@ -211,6 +345,7 @@ def output(self, timeout="wait_forever"):

except Exception as e:
self._saved_output = e
raise e
else:
self._saved_output = FunctionCallNoResult()
return self._saved_output
Expand Down Expand Up @@ -279,19 +414,24 @@ def retrieve_output(arg):

if not self._is_retriever:
if not self._output_loaded:
self._output = self._retriever._future.result(timeout=timeout)
result = self._retriever._future.result(timeout=timeout)
if result == RESULT_PENDING:
return RESULT_PENDING
self._output = result
self._output_loaded = True
if not self._ran_functions:
self._ran_functions = True
for fn in self._future._callback_fns:
fn(self._future)
self._ran_functions = True
return self._output

else:
if not self._has_retrieved:
result = self._module_manager.wait_for_task_id(self.id, timeout=timeout)
if result:
self._has_retrieved = True
else:
return RESULT_PENDING
if not self._output_loaded and self._has_retrieved:
if self.successful():
try:
Expand All @@ -302,6 +442,7 @@ def retrieve_output(arg):
self._output = f.read()
except Exception as e:
self._output = e
raise e
else:
self._output = PythonTaskNoResult()
self._output_loaded = True
Expand Down
84 changes: 84 additions & 0 deletions taskvine/test/TR_vine_python_future_module.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/sh

set -e

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR

export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH

STATUS_FILE=vine.status
PORT_FILE=vine.port

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1

# Poncho currently requires ast.unparse to serialize the function,
# which only became available in Python 3.9. Some older platforms
# (e.g. almalinux8) will not have this natively.
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1

# In some limited build circumstances (e.g. macos build on github),
# poncho doesn't work due to lack of conda-pack or cloudpickle
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1

return 0
}

prepare()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
return 0
}

run()
{
( ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_future_module.py $PORT_FILE; echo $? > $STATUS_FILE ) &

# wait at most 15 seconds for vine to find a port.
wait_for_file_creation $PORT_FILE 15

run_taskvine_worker $PORT_FILE worker.log --cores 2 --memory 2000 --disk 2000

# wait for vine to exit.
wait_for_file_creation $STATUS_FILE 15

# retrieve exit status
status=$(cat $STATUS_FILE)
if [ $status -ne 0 ]
then
# display log files in case of failure.
logfile=$(latest_vine_debug_log)
if [ -f ${logfile} ]
then
echo "master log:"
cat ${logfile}
fi

if [ -f worker.log ]
then
echo "worker log:"
cat worker.log
fi

exit 1
fi

exit 0
}

clean()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
rm -rf vine-run-info
exit 0
}


dispatch "$@"
Loading

0 comments on commit 4b43490

Please sign in to comment.