diff --git a/CHANGES.md b/CHANGES.md index b67d3b8c5..ef66ff53d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,12 @@ https://github.com/radical-cybertools/radical.saga/issues?q=is%3Aissue+is%3Aopen+ +Version 1.0.1 Release 2020-01-23 +-------------------------------------------------------------------------------- + + - small fixes and cleanup for slurm and docs (tutorial prep) + + Version 1.0.0 Release 2019-12-24 -------------------------------------------------------------------------------- diff --git a/VERSION b/VERSION index 3eefcb9dd..7dea76edb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.0 +1.0.1 diff --git a/docs/source/_themes/armstrong/theme.conf b/docs/source/_themes/armstrong/theme.conf index 450780c8a..448d201c0 100644 --- a/docs/source/_themes/armstrong/theme.conf +++ b/docs/source/_themes/armstrong/theme.conf @@ -25,9 +25,11 @@ black = #111111 light_color = #C5D2DD light_medium_color = #DDEAF0 medium_color = #6588A5 +medium_color_hover = rgba(255, 255, 255, 0.25) medium_color_link = #634320 medium_color_link_hover = #261a0c dark_color = #293947 +green_highlight = #8ecc4c h1 = #5a5a5a h2 = #5a5a5a @@ -36,10 +38,6 @@ h3 = #5a5a5a link_color = #0088cc link_color_decoration = #0088cc -medium_color_hover = rgba(255, 255, 255, 0.25) -medium_color = rgba(255, 255, 255, 0.5) -green_highlight = #8ecc4c - positive_dark = rgba(51, 77, 0, 1.0) positive_medium = rgba(102, 153, 0, 1.0) diff --git a/docs/source/usage/config.rst b/docs/source/usage/config.rst index c1261e24f..9b0417381 100644 --- a/docs/source/usage/config.rst +++ b/docs/source/usage/config.rst @@ -5,8 +5,13 @@ Configuration .. note:: - SAGA has been designed as a zero-configuration library. Unless you are - experiencing problems with one of the default configuration settings, there's + This section is outdated! + + +.. note:: + + SAGA has been designed as a zero-configuration library. Unless you are + experiencing problems with one of the default configuration settings, there's really no need to create a configuration file for SAGA. SAGA and its individual middleware adaptors provide various optional @@ -22,8 +27,8 @@ Configuration Files ------------------- If you need to make persistent changes to any of SAGA's :ref:`conf_options`, the -simplest option is to create a configuration file. During startup, SAGA checks -for the existence of a configuration file in `$HOME/.saga.conf`. If that +simplest option is to create a configuration file. During startup, SAGA checks +for the existence of a configuration file in `$HOME/.saga.conf`. If that configuration file is found, it is parsed by SAGA's configuration system. SAGA configuration files use a structure that looks like this:: @@ -32,7 +37,7 @@ SAGA configuration files use a structure that looks like this:: [saga.logger] option = value - + [saga.adaptor.name] option = value @@ -47,10 +52,10 @@ Module saga.utils.config The config module provides classes and functions to introspect and modify SAGA's configuration. The :func:`getConfig` function is used to get the -:class:`GlobalConfig` object which represents the current configuration +:class:`GlobalConfig` object which represents the current configuration of SAGA:: - from saga.utils.config import getConfig + from saga.utils.config import getConfig sagaconf = getConfig() print sagaconf.get_category('saga.utils.logger') diff --git a/docs/source/usage/install.rst b/docs/source/usage/install.rst index 122044beb..3c9e4f144 100644 --- a/docs/source/usage/install.rst +++ b/docs/source/usage/install.rst @@ -9,14 +9,14 @@ Requirements radical.saga has the following requirements: -* Python 2.7 or newer +* Python 3.5 or newer Installation via PyPi --------------------- -radical.saga is available for download via PyPI and may be installed using -easy_install or pip (preferred). Both automatically download and install all +radical.saga is available for download via PyPI and may be installed using +easy_install or pip (preferred). Both automatically download and install all dependencies required by radical.saga if they can't be found on your system: .. code-block:: bash @@ -28,20 +28,20 @@ dependencies required by radical.saga if they can't be found on your system: Using Virtualenv ---------------- -If you don't want to (or can't) install RADICAL-SAGA into your system's Python -environment, there's a simple (and often preferred) way to create an -alternative Python environment (e.g., in your home directory): +If you don't want to (or can't) install RADICAL-SAGA into your system's Python +environment, there's a simple (and often preferred) way to create an +alternative Python environment (e.g., in your home directory): .. code-block:: bash virtualenv --no-site-package $HOME/sagaenv/ . $HOME/sagaenv/bin/activate - pip install radical.saga + pip install radical.saga **What if my system Doesn't come With virtualenv, pip or easy_install?** -There's a simple workaround for that using the 'instant' version of virtualenv. +There's a simple workaround for that using the 'instant' version of virtualenv. It also installs easy_install and pip: .. code-block:: bash @@ -62,3 +62,4 @@ You can install the latest development version of RADICAL-SAGA directly from our .. code-block:: bash pip install -e git://github.com/radical-cybertools/radical.saga.git@devel#egg=radical.saga + diff --git a/docs/source/usage/logger.rst b/docs/source/usage/logger.rst index 1eee4768c..81ea45975 100644 --- a/docs/source/usage/logger.rst +++ b/docs/source/usage/logger.rst @@ -7,7 +7,7 @@ In a distributed environment unified error logging and reporting is a crucial capability for debugging and monitoring. SAGA has a configurable logging system that captures debug, info, warning and error messages across all of its middelware adaptors. The logging system can be controlled in two different -ways: via :ref:`env_vars` variables, which should be sufficient in most +ways: via :ref:`env_vars` variables, which should be sufficient in most scenarios, and via the :ref:`log_api`, which provides programmatic access to the logging system for advanced use-cases. @@ -17,101 +17,80 @@ the logging system for advanced use-cases. Environment Variables --------------------- -Several environment variables can be used to control SAGA's logging behavior from +Several environment variables can be used to control SAGA's logging behavior from the command line. Obviously, this can come in handy when debugging a problem -with an existing SAGA application. Environment variables are set in the -executing shell and evaluated by SAGA at program startup. +with an existing SAGA application. Environment variables are set in the +executing shell and evaluated by SAGA at program startup. -.. envvar:: SAGA_VERBOSE +.. envvar:: RADICAL_SAGA_LOG_LVL Controls the log level. This controls the amount of output generated by the - logging system. ``SAGA_VERBOSE`` expects either a numeric (0-4) value or a + logging system. ``RADICAL_LOG_LVL`` expects either a numeric (0-4) value or a string (case insensitive) representing the log level: +---------------+---------------------+------------------------------------+ | Numeric Value | Log Level | Type of Messages Displayed | +===============+=====================+====================================+ | 0 | ``CRITICAL`` | Only fatal events that will cause | - | (default) | | SAGA to abort. | + | (default) | | SAGA to abort. | | | | | +---------------+---------------------+------------------------------------+ | 1 | ``ERROR`` | Errors that will not necessarily | - | | | cause SAGA to abort. | + | | | cause SAGA to abort. | | | | | +---------------+---------------------+------------------------------------+ | 2 | ``WARNING`` | Warnings that are generated by | - | | | SAGA and its middleware adaptors. | + | | | SAGA and its middleware adaptors. | | | | | +---------------+---------------------+------------------------------------+ - | 3 | ``INFO`` | Useful (?) runtime information | - | | | that is generated by SAGA and its | + | 3 | ``INFO`` | Useful (?) runtime information | + | | | that is generated by SAGA and its | | | | middleware adaptors. | +---------------+---------------------+------------------------------------+ | 4 | ``DEBUG`` | Debug message added to the code | - | | | by the developers. (Lots of output)| + | | | by the developers. (Lots of output)| | | | | +---------------+---------------------+------------------------------------+ - For example, if you want to see the debug messages that SAGA generates during - program execution, you would set :envvar:`SAGA_VERBOSE` to ``DEBUG`` before - you run your program:: + For example, if you want to see the debug messages that SAGA generates during + program execution, you would set :envvar:`RADICAL_LOG_LVL` to ``DEBUG`` + before you run your program:: - SAGA_VERBOSE=DEBUG python mysagaprog.py + RADICAL_SAGA_LOG_LVL=DEBUG python mysagaprog.py -.. envvar:: SAGA_LOG_FILTERS + .. envvar:: RADICAL_LOG_LVL - Controls the message sources displayed. SAGA uses a - hierarchal structure for its log sources. Starting with the root logger - ``saga``, several sub loggers are defined for SAGA-internal logging events - (``saga.engine``) and individual middleware adaptors ``saga.adaptor.name``. - ``SAGA_LOG_FILTERS`` expects either a single source name or a - comma-separated list of source names. Non-existing source names are ignored. + Controls the message sources displayed. RCT use an hierarchal structure for + its log sources. Starting with the root logger ``RADICAL``, sub loggers are + defined for internal logging events (``RADICAL_SAGA``, + ``RADICAL_SAGA_ENGINE`` etc.) and individual middleware adaptors, e.g., + ``RADICAL_SAGA_ADAPTORS_NAME``. ``LOG_LVL`` and ``LOG_TGT`` can be set + individually for those loggers. - For example, if you want to see only the debug messages generated by - ``saga.engine`` and a specific middleware adaptor called ``xyz`` you would - set the following environment variables:: + For example, if you want to see only the debug messages generated by + ``saga.engine`` and a specific middleware adaptor called ``xyz`` you would + set the following environment variables:: - SAGA_VERBOSE=DEBUG SAGA_LOG_FILTERS=saga.engine,saga.adaptor.xyz python mysagaprog.py + RADICAL_LOG_LVL=ERROR \ # mute everything + RADICAL_SAGA_ENGINNE_LOG_LVL=DEBUG \ # enable engine logger + RADICAL_SAGA_ADAPTORS_XYZ_LOG_LVL=DEBUG \ # enable XYZ logger + python mysagaprog.py -.. envvar:: SAGA_LOG_TARGETS +.. envvar:: RADICAL_SAGA_LOG_TGT - Controls where the log messages go. Multiple concurrent locations are - supported. - ``SAGA_LOG_TARGETS`` expects either a single location or a comma-separated - list of locations, where a location can either be a path/filename or - the ``STDOUT`` keyword (case insensitive) for logging to the console. + Controls where the log messages go. Multiple concurrent locations are + supported. ``RADICAL_LOG_TGT`` expects either a single location or + a comma-separated list of locations, where a location can either be + a path/filename or the ``stdout``/``stderr`` keyword (case sensitive) for + logging to the console. For example, if you want to see debug messages on the console but also - want to log them in a file for further analysis, you would set the the + want to log them in a file for further analysis, you would set the the following environment variables:: - SAGA_VERBOSE=DEBUG SAGA_LOG_TARGETS=STDOUT,/tmp/mysaga.log python mysagaprog.py - - -.. _log_api: - -Application Level Logging -------------------------- - -The RADICAL-SAGA logging utilities are a thin wrapper around Python's logging -facilities, integrated into the RADICAL-SAGA configuration facilities. -To support the seamless integration of application level logging needs, the -:func:`saga.utils.logger.getLogger` allows to produce additional logger -facilities, which are again native Python :class:`logging.Logger` instances, but -preconfigured according to the RADICAL-SAGA logging configuration. -Those instances can then be further customized as needed:: - - from saga.utils.logger import getLogger, INFO - - app_logger = getLogger ('application.test') - app_logger.level = INFO - - app_logger.info ('application level log message on INFO level') - - -.. automodule:: saga.utils.logger - :members: + RADICAL_SAGA_LOG_LVL=DEBUG RADICAL_SAGA_LOG_TGT=stdout,./rs.log \ + python mysagaprog.py diff --git a/examples/jobs/localjob.py b/examples/jobs/localjob.py index d312d6d61..551866513 100755 --- a/examples/jobs/localjob.py +++ b/examples/jobs/localjob.py @@ -5,10 +5,12 @@ __license__ = "MIT" -""" This examples shows how to run a job on the local machine - using the 'local' job adaptor. +""" +This examples shows how to run a job on the local machine +using the 'local' job adaptor. """ +import os import sys import radical.saga as saga @@ -30,7 +32,7 @@ def main(): jd.executable = '/usr/bin/touch' jd.arguments = ['$FILENAME'] - jd.working_directory = "/tmp/A/B/C" + jd.working_directory = "/tmp/%d/A/B/C" % os.getuid() jd.output = "examplejob.out" jd.error = "examplejob.err" diff --git a/examples/jobs/slurmjob.py b/examples/jobs/slurmjob.py index 7697589ea..de6456dfa 100755 --- a/examples/jobs/slurmjob.py +++ b/examples/jobs/slurmjob.py @@ -17,7 +17,7 @@ import radical.saga as rs -js_url = "slurm+gsissh://stampede2.tacc.xsede.org:2222/" +js_url = "slurm://localhost/" # ------------------------------------------------------------------------------ @@ -40,8 +40,8 @@ def start(): jd.arguments = ['$FILENAME'] jd.name = "examplejob" - jd.queue = "normal" - jd.project = "TG-MCB090174" + # jd.queue = "normal" + # jd.project = "TG-MCB090174" jd.working_directory = ".saga/test" jd.output = "examplejob.out" @@ -52,13 +52,13 @@ def start(): job = js.create_job(jd) # Check our job's id and state - print("Job ID : %s" % (job.id)) - print("Job State : %s" % (job.state)) + print("Job State : %s" % (job.state)) # Now we can start our job. print("starting job") job.run() + print("Job ID : %s" % (job.id)) print("Job State : %s" % job.state) print("Exitcode : %s" % job.exit_code) print("Exec. hosts : %s" % job.execution_hosts) diff --git a/examples/jobs/slurmjobcontainer.py b/examples/jobs/slurmjobcontainer.py new file mode 100755 index 000000000..83b57bf65 --- /dev/null +++ b/examples/jobs/slurmjobcontainer.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python + +__author__ = 'RADICAL @ Rutgers' +__copyright__ = 'Copyright 2012-2013, The SAGA Project' +__license__ = 'MIT' + + +''' +This examples shows how to run groups of jobs using the 'local' file adaptor. +This example uses job containers for simplified and optimized bulk job handling. + +Job container can be used to model dependencies between groups of different +jobs, e.g., in workflow scenarios. In this example, we execute 'num_job_groups' +containers of jobs_per_group' number of parallel jobs sequentially:: + + C1[j1,j2,j3,j4,...] -> C2[j1,j2,j3,j4,...] -> C3[j1,j2,j3,j4,...] -> ... + +Depending on the adaptor implementation, using job containers can be quite +advantageous in terms of call latency. Some adaptors implement special bulk +operations for container management, which makes them generally much faster than +iterating over and operating on individual jobs. +''' + +import sys +import random + +import radical.saga as rs + + +URL = 'slurm://locahost/' + + +# ------------------------------------------------------------------------------ +def main(): + + # number of job 'groups' (containers) and of jobs per group + num_job_groups = 10 + jobs_per_group = 2 # check slurm limits! + + current = None + + try: + # all jobs in this example are running on the same job service + service = rs.job.Service(URL) + + # create and populate our containers + containers = list() + for c in range(0, num_job_groups): + + # create containers + containers.append(rs.job.Container()) + + # add jobs to container. + for j in range(0, jobs_per_group): + jd = rs.job.Description() + jd.executable = '/bin/sleep' + jd.arguments = ['10'] + jd.name = ['job.%02d.%03d' % (c, j)] + j = service.create_job(jd) + containers[c].add(j) + + # execute the containers sequentially + for c in range(0, num_job_groups): + print('run container %s ' % c) + containers[c].run() + + current = containers[c] # see exception handling + + for j in containers[c].get_tasks(): + print('%s: %s: %s' % (j.name, j.id, j.state)) + + print(containers[c].get_states ()) + containers[c].wait() + print(containers[c].get_states ()) + + # all jobs in the container finished - print some job infos + for job in containers[c].jobs: + print(' * %s: %s (%s) @%s %s - %s' \ + % (job.id, job.state, job.exit_code, job.execution_hosts, + job.started, job.finished)) + + print() + + service.close() + return 0 + + except rs.SagaException as ex: + + print('An exception occured: %s ' % ((str(ex)))) + # get the whole traceback in case of an exception - + # this can be helpful for debugging the problem + print(' *** %s' % ex.traceback) + return -1 + + + finally: + + # make sure we leave no jobs behind + if current is not None: + print('\ncancel current container: %s' % current) + current.cancel() + for job in current.jobs: + print(' * %s: %s (%s) @%s %s - %s' \ + % (job.id, job.state, job.exit_code, job.execution_hosts, + job.started, job.finished)) + print() + + +# ------------------------------------------------------------------------------ +if __name__ == '__main__': + + sys.exit(main()) + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/saga/adaptors/noop/__init__.py b/src/radical/saga/adaptors/noop/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/radical/saga/adaptors/noop/noop_job.py b/src/radical/saga/adaptors/noop/noop_job.py new file mode 100644 index 000000000..a4b6401c3 --- /dev/null +++ b/src/radical/saga/adaptors/noop/noop_job.py @@ -0,0 +1,681 @@ + +__author__ = "Andre Merzky" +__copyright__ = "Copyright 2020, The SAGA Project" +__license__ = "MIT" + + +''' no operation (noop) based job adaptor implementation ''' + +import time +import threading + +from ... import exceptions as rse +from .. import base +from ..cpi import SYNC_CALL +from ..cpi import job as cpi +from ... import job as api + + +# ------------------------------------------------------------------------------ +# +class _job_state_monitor(threading.Thread): + + # -------------------------------------------------------------------------- + # + def __init__(self, log): + + self._log = log + self._lock = threading.Lock() + self._term = threading.Event() + self._jobs = dict() + self._cnt = 0 + + super(_job_state_monitor, self).__init__() + + self.setDaemon(True) + + + # -------------------------------------------------------------------------- + # + def stop(self): + + self._term.set() + + + # -------------------------------------------------------------------------- + # + def add_job(self, job): + + job._id = 'job.%06d' % self._cnt + self._cnt += 1 + + assert(job._id not in self._jobs) + job._set_state(api.RUNNING) + job._started = time.time() + + with self._lock: + self._jobs[job._id] = job + + + # -------------------------------------------------------------------------- + # + def get_job(self, jid): + + assert(jid in self._jobs) + with self._lock: + return self._jobs[jid] + + + # -------------------------------------------------------------------------- + # + def list_jobs(self): + + with self._lock: + return list(self._jobs.keys()) + + + # -------------------------------------------------------------------------- + # + def run(self): + + try: + + while not self._term.is_set(): + + now = time.time() + keep = dict() + + with self._lock: + + for job in self._jobs.values(): + + if job.get_state() == api.CANCELED: + continue + + if job.tgt < now: + job._finished = now + job._exit_code = 0 + job._set_state(api.DONE) + + else: + keep[job._id] = job + + self._jobs = keep + + time.sleep(0.1) + + except Exception: + + self._log.exception("Exception in job monitoring thread") + + +# ------------------------------------------------------------------------------ +# the adaptor name +# +_ADAPTOR_NAME = "radical.saga.adaptors.noop_job" +_ADAPTOR_SCHEMAS = ["noop"] + + + +# ------------------------------------------------------------------------------ +# the adaptor capabilities & supported attributes +# +_ADAPTOR_CAPABILITIES = { + "jdes_attributes" : [api.NAME, + api.EXECUTABLE, + api.PRE_EXEC, + api.POST_EXEC, + api.ARGUMENTS, + api.ENVIRONMENT, + api.WORKING_DIRECTORY, + api.FILE_TRANSFER, + api.INPUT, + api.OUTPUT, + api.ERROR, + api.NAME, + api.WALL_TIME_LIMIT, + api.TOTAL_CPU_COUNT, + api.TOTAL_GPU_COUNT, + api.PROCESSES_PER_HOST, + api.SPMD_VARIATION, + ], + "job_attributes" : [api.EXIT_CODE, + api.EXECUTION_HOSTS, + api.CREATED, + api.STARTED, + api.FINISHED], + "metrics" : [api.STATE, + api.STATE_DETAIL], + "contexts" : {} +} + +# ------------------------------------------------------------------------------ +# the adaptor documentation +# +_ADAPTOR_DOC = { + "name" : _ADAPTOR_NAME, + "capabilities" : _ADAPTOR_CAPABILITIES, + "description" : ''' + The Noop job adaptor, which fakes job execution and is used for testing + and benchmarking purposes.''', + "example" : "examples/jobs/noopjob.py", + "schemas" : {"noop" : "fake job execution"} +} + +# ------------------------------------------------------------------------------ +# the adaptor info is used to register the adaptor with SAGA + +_ADAPTOR_INFO = { + "name" : _ADAPTOR_NAME, + "version" : "v0.1", + "schemas" : _ADAPTOR_SCHEMAS, + "capabilities" : _ADAPTOR_CAPABILITIES, + "cpis" : [{ + "type" : "radical.saga.job.Service", + "class": "NoopJobService" + },{ + "type" : "radical.saga.job.Job", + "class": "NoopJob" + }] +} + + +# ------------------------------------------------------------------------------ +# The adaptor class +class Adaptor(base.Base): + ''' + This is the actual adaptor class, which gets loaded by SAGA (i.e., by the + SAGA engine), and which registers the CPI implementation classes which + provide the adaptor's functionality. + ''' + + + # -------------------------------------------------------------------------- + # + def __init__(self): + + base.Base.__init__(self, _ADAPTOR_INFO, expand_env=False) + + + # -------------------------------------------------------------------------- + # + def sanity_check(self): + + pass + + +# ------------------------------------------------------------------------------ +# +class NoopJobService(cpi.Service): + + # -------------------------------------------------------------------------- + # + def __init__(self, api, adaptor): + + _cpi_base = super (NoopJobService, self) + _cpi_base.__init__(api, adaptor) + + + # -------------------------------------------------------------------------- + # + def __del__(self): + + try : self.close() + except: pass + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def init_instance(self, adaptor_state, rm_url, session): + ''' + Service instance constructor + ''' + + self.rm = rm_url + self.session = session + self.jobs = dict() + + # Use `_set_session` method of the base class to set the session object. + # `_set_session` and `get_session` methods are provided by `CPIBase`. + self._set_session(session) + + # the monitoring thread - one per service instance. + self.monitor = _job_state_monitor(self._logger) + self.monitor.start() + + return self.get_api() + + + # -------------------------------------------------------------------------- + # + def close(self): + + if self.monitor: + self.monitor.stop() + + + # -------------------------------------------------------------------------- + # + # + def _job_run(self, job): + ''' + runs a job on the wrapper via pty, and returns the job id + ''' + + if not job.jd.executable.endswith('sleep'): + raise ValueError('expected "sleep", not %s' % job.jd.executable) + + + if len(job.jd.arguments) != 1: + raise ValueError('expected int argument, not %s' % job.jd.arguments) + + job.tgt = time.time() + int(job.jd.arguments[0]) + self.monitor.add_job(job) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def run_job(self, cmd, host): + ''' + Implements adaptors.cpi.job.Service.run_job() + ''' + + if not cmd: + raise rse.BadParameter._log(self._logger, + "run_job needs a command to run") + + if host and host != self.rm.host: + raise rse.BadParameter._log(self._logger, + "Can only run jobs on %s, not on %s" % (self.rm.host, host)) + + jd = api.Description() + + exe, arg = cmd.split() + + jd.executable = exe + jd.arguments = [arg] + + job = self.create_job(jd) + job.run() + + return job + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def create_job(self, jd): + + # this dict is passed on to the job adaptor class -- use it to pass any + # state information you need there. + adaptor_state = {"job_service" : self, + "job_description": jd, + "job_schema" : self.rm.schema } + + return api.Job(_adaptor=self._adaptor, _adaptor_state=adaptor_state) + + + # -------------------------------------------------------------------------- + @SYNC_CALL + def get_url(self): + + return self.rm + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def list(self): + + return self.monitor.list_jobs() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_job(self, job_id, no_reconnect=False): + + return self.monitor.get_job(job_id) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_run(self, jobs): + ''' + From all the job descriptions in the container, build a bulk, and submit + as async. + ''' + + self._logger.debug("container run: %s" % str(jobs)) + + for job in jobs: + job.run() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_wait(self, jobs, mode, timeout): + + for job in jobs: + + job.wait() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_cancel(self, jobs, timeout): + + for job in jobs: + + job.cancel() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_get_states(self, jobs): + + return [job.get_state() for job in jobs] + + +# ------------------------------------------------------------------------------ +# +class NoopJob(cpi.Job): + + # -------------------------------------------------------------------------- + # + def __init__(self, api, adaptor): + + _cpi_base = super (NoopJob, self) + _cpi_base.__init__(api, adaptor) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def init_instance(self, job_info): + + if 'job_description' in job_info: + # comes from job.service.create_job() + self.js = job_info["job_service"] + self.jd = job_info["job_description"] + + # initialize job attribute values + self._id = None + self._name = self.jd.get(api.NAME) + self._log = list() + self._state = None + self._exit_code = None + self._exception = None + self._created = time.time() + self._name = self.jd.name + self._started = None + self._finished = None + + self._set_state(api.NEW) + + elif 'job_id' in job_info: + # initialize job attribute values + self.js = job_info["job_service"] + self.jd = None + self._id = job_info['job_id'] + self._name = job_info.get('job_name') + self._log = list() + self._state = None + self._exit_code = None + self._exception = None + self._created = None + self._name = None + self._started = None + self._finished = None + + else: + # don't know what to do... + raise rse.BadParameter("insufficient info for job creation") + + if self._created: self._created = float(self._created) + + return self.get_api() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_description(self): + + return self.jd + + + # -------------------------------------------------------------------------- + # + def _update_state(self, state): + + # update state, and report to application + self._state = state + self._api()._attributes_i_set('state', self._state, self._api()._UP) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_state(self): + ''' + Implements adaptors.cpi.job.Job.get_state() + ''' + + # may not yet have backend representation, state is 'NEW' + if self._id is None: + return self._state + + return self._state + + + # -------------------------------------------------------------------------- + # + def _set_state(self, state): + + old_state = self._state + + # on state changes, trigger notifications + if old_state != state: + self._state = state + self._api()._attributes_i_set('state', state, self._api()._UP) + + return self._state + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_created(self): + + return self._created + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_started(self): + + return self._started + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_finished(self): + + return self._finished + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stdout(self): + + return '' + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stderr(self): + + return '' + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_log(self): + + return '' + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_service_url(self): + + if not self.js: + raise rse.IncorrectState("Job Service URL unknown") + else: + return self.js.get_url() + + + # -------------------------------------------------------------------------- + # + # TODO: this should also fetch the(final) state, to safe a hop + # TODO: implement via notifications + # + @SYNC_CALL + def wait(self, timeout): + ''' + A call to the noop to do the WAIT would block the noop for any + other interactions. In particular, it would practically kill it if the + Wait waits forever... + + So we implement the wait via a state pull. The *real* solution is to + implement state notifications, and wait for such a notification to + arrive within timeout seconds... + ''' + + time_start = time.time() + time_now = time_start + + while True: + + state = self.get_state() + + if state in [api.DONE, api.FAILED, api.CANCELED]: + return True + + # avoid busy poll + # FIXME: self-tune by checking call latency + time.sleep(0.1) + + # check if we hit timeout + if timeout >= 0: + time_now = time.time() + if time_now - time_start > timeout: + return False + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_id(self): + + return self._id + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_name(self): + + return self._name + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_exit_code(self): + + return self._exit_code + + + # -------------------------------------------------------------------------- + # + # TODO: the values below should be fetched with every get_state... + # + @SYNC_CALL + def get_execution_hosts(self): + + return [self.js.get_url().host] + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def run(self): + + self.js._job_run(self) + self.js.jobs[self._id] = self._api() + + self._set_state(api.RUNNING) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def suspend(self): + + if self.get_state() != api.RUNNING: + raise rse.IncorrectState("Cannot suspend, job is not RUNNING") + + self._old_state = self.get_state() + self._adaptor._update_state(api.SUSPENDED) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def resume(self): + + if self.get_state() != api.SUSPENDED: + raise rse.IncorrectState("Cannot resume, job is not SUSPENDED") + + self._adaptor._update_state(self._old_state) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def cancel(self, timeout): + + if self.get_state() not in [api.RUNNING, api.SUSPENDED, + api.CANCELED, api.DONE, + api.FAILED]: + raise rse.IncorrectState("Cannot cancel, job is not running") + + if self._state in [api.CANCELED, api.DONE, api.FAILED]: + self._set_state(api.CANCELED) + return + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def re_raise(self): + # nothing to do here actually, as run() is synchronous... + return self._exception + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/saga/adaptors/shell/shell_wrapper.sh b/src/radical/saga/adaptors/shell/shell_wrapper.sh index fe9d9d449..d409c8d1a 100644 --- a/src/radical/saga/adaptors/shell/shell_wrapper.sh +++ b/src/radical/saga/adaptors/shell/shell_wrapper.sh @@ -1,6 +1,6 @@ #!/bin/sh -# be friendly to bash users (and yes, the leading space is on purpose) +# be friendly to bash users HISTIGNORE='*' export HISTIGNORE @@ -8,8 +8,8 @@ export HISTIGNORE # other shell extensions. It expects /bin/sh to be a POSIX compliant shell # thought. # -# The invokation passes one (optional) parameter, the base workdir. That -# directory will be used to keep job state data. It' default value is set to +# The invocation passes one (optional) parameter, the base workdir. That +# directory will be used to keep job state data. Its default value is set to # $HOME/.radical/saga/adaptors/shell_job/ @@ -25,12 +25,13 @@ export HISTIGNORE # } # # on tracing: -# http://www.unix.com/shell-programming-and-scripting/165648-set-x-within-script-capture-file.html +# http://www.unix.com/shell-programming-and-scripting/ \ +# 165648-set-x-within-script-capture-file.html # -------------------------------------------------------------------- # -# Fucking /bin/kill by Ubuntu sometimes understands --, sometimes does not :-P +# Fucking /bin/kill on Ubuntu sometimes understands --, sometimes does not :-P # We need to check the version, and assume that prior to 3.3.0 it is not # understood KILL_DASHES="--" @@ -47,7 +48,7 @@ fi # -------------------------------------------------------------------- # # POSIX echo does not understand '\n'. For multiline strings we thus use printf -# -- but printf will interprete every single '%' in the string, which we don't +# -- but printf will interpret every single '%' in the string, which we don't # want. We thus escape it to '%%' qprintf(){ \printf "%b\n" "$*" @@ -203,7 +204,7 @@ decode () { # -------------------------------------------------------------------- # -# it is suprisingly difficult to get seconds since epoch in POSIX -- +# it is surprisingly difficult to get seconds since epoch in POSIX -- # 'date +%%s' is a GNU extension... Anyway, awk to the rescue! # timestamp () { @@ -863,7 +864,7 @@ cmd_quit () { \stty echonl >/dev/null 2>&1 \printf "cmd_quit called ($EXIT_VAL)" - + # avoid running circles \trap - EXIT diff --git a/src/radical/saga/adaptors/slurm/slurm_job.py b/src/radical/saga/adaptors/slurm/slurm_job.py index 2bc4208f2..20358137a 100644 --- a/src/radical/saga/adaptors/slurm/slurm_job.py +++ b/src/radical/saga/adaptors/slurm/slurm_job.py @@ -21,10 +21,13 @@ from ...utils import pty_shell as rsups from ... import job as api_job from ... import exceptions as rse +from ... import filesystem as sfs from .. import base as a_base from ..cpi import job as cpi_job from ..cpi import decorators as cpi_decs +from ...utils.job import TransferDirectives + SYNC_CALL = cpi_decs.SYNC_CALL ASYNC_CALL = cpi_decs.ASYNC_CALL @@ -391,7 +394,7 @@ def _open(self): if len(ppn_vals) >= 1: self._ppn = int(ppn_vals[0]) else : self._ppn = None - self._logger.info(" === ppn: %s", self._ppn) + self._logger.info("ppn: %s", self._ppn) # -------------------------------------------------------------------------- @@ -405,6 +408,51 @@ def _close(self): self.shell = None + # -------------------------------------------------------------------------- + # + def _handle_file_transfers(self, ft, mode): + """ + if mode == 'in' : perform sanity checks on all staging directives. + + if mode == 'in' : stage files to condor submission site + if mode == 'out': stage files from condor submission site + """ + + td = TransferDirectives(ft) + + assert(mode in ['in', 'out']) + + if mode == 'in': + + if td.in_append: + raise Exception('File append (>>) not supported') + + if td.out_append: + raise Exception('File append (<<) not supported') + + if td.in_overwrite: + + for (local, remote) in td.in_overwrite: + + source = local + target = remote + + self._logger.info("Transferring in %s to %s", source, target) + self.shell.stage_to_remote(source, target) + + elif mode == 'out': + + if td.out_overwrite: + + for (local, remote) in td.out_overwrite: + + source = remote + target = local + + self._logger.info("Transferring out %s to %s", source, target) + self.shell.stage_from_remote(source, target) + + # -------------------------------------------------------------------------- # # @@ -428,7 +476,7 @@ def _job_run(self, jd): processes_per_host = jd.as_dict().get(c.PROCESSES_PER_HOST) output = jd.as_dict().get(c.OUTPUT, "radical.saga.stdout") error = jd.as_dict().get(c.ERROR, "radical.saga.stderr") - # file_transfer = jd.as_dict().get(c.FILE_TRANSFER) + file_transfer = jd.as_dict().get(c.FILE_TRANSFER) wall_time = jd.as_dict().get(c.WALL_TIME_LIMIT) queue = jd.as_dict().get(c.QUEUE) project = jd.as_dict().get(c.PROJECT) @@ -451,6 +499,7 @@ def _job_run(self, jd): if ret: raise rse.NoSuccess("Couldn't create workdir: %s" % out) + self._handle_file_transfers(file_transfer, mode='in') if isinstance(c_hosts, list): c_hosts = ','.join(c_hosts) @@ -604,30 +653,36 @@ def _job_run(self, jd): # find out what our job ID is # TODO: Could make this more efficient - self.job_id = None + job_id = None for line in out.split("\n"): if "Submitted batch job" in line: - self.job_id = "[%s]-[%s]" % (self.rm, int(line.split()[-1:][0])) + job_id = "[%s]-[%s]" % (self.rm, int(line.split()[-1:][0])) break # if we have no job ID, there's a failure... - if not self.job_id: + if not job_id: raise rse.NoSuccess._log(self._logger, "Couldn't get job id from submitted job!" " sbatch output:\n%s" % out) - self._logger.debug("started job %s" % self.job_id) + self._logger.debug("started job %s" % job_id) self._logger.debug("Batch system output:\n%s" % out) # create local jobs dictionary entry - self.jobs[self.job_id] = {'state' : c.PENDING, - 'create_time': None, - 'start_time' : None, - 'end_time' : None, - 'comp_time' : None, - 'exec_hosts' : None, - 'gone' : False} - return self.job_id + self.jobs[job_id] = {'state' : c.PENDING, + 'create_time': None, + 'start_time' : None, + 'end_time' : None, + 'comp_time' : None, + 'exec_hosts' : None, + 'gone' : False, + 'output' : output, + 'error' : error, + 'stdout' : None, + 'stderr' : None, + 'ft' : file_transfer, + } + return job_id # -------------------------------------------------------------------------- @@ -954,6 +1009,11 @@ def _job_get_info(self): curr_info['comp_time' ] = prev_info.get('comp_time' ) curr_info['exec_hosts' ] = prev_info.get('exec_hosts' ) curr_info['gone' ] = prev_info.get('gone' ) + curr_info['output' ] = prev_info.get('output' ) + curr_info['error' ] = prev_info.get('error' ) + curr_info['stdout' ] = prev_info.get('stdout' ) + curr_info['stderr' ] = prev_info.get('stderr' ) + curr_info['ft' ] = prev_info.get('ft' ) else: curr_info['job_id' ] = None curr_info['job_name' ] = None @@ -964,6 +1024,11 @@ def _job_get_info(self): curr_info['comp_time' ] = None curr_info['exec_hosts' ] = None curr_info['gone' ] = None + curr_info['output' ] = None + curr_info['error' ] = None + curr_info['stdout' ] = None + curr_info['stderr' ] = None + curr_info['ft' ] = None rm, pid = self._adaptor.parse_id(self._id) @@ -1001,7 +1066,7 @@ def _job_get_info(self): elems = out.split() data = dict() - for elem in elems: + for elem in sorted(elems): parts = elem.split('=', 1) @@ -1013,6 +1078,7 @@ def _job_get_info(self): key, val = parts if val in ['', '(null)']: val = None + self._logger.info('%-20s := %s', key, val) data[key] = val if data.get('JobState'): @@ -1042,8 +1108,35 @@ def _job_get_info(self): if not curr_info['start_time' ]: curr_info['start_time' ] = now if curr_info['state'] in c.FINAL: + if not curr_info['end_time' ]: curr_info['end_time' ] = now + if curr_info['stdout'] is None: + + if curr_info['output'] is None: + curr_info['output'] = data.get('StdOut') + + ret, out, err = self.js.shell.run_sync( + 'cat %s' % curr_info['output']) + if ret: curr_info['stdout'] = None + else : curr_info['stdout'] = out + + if curr_info['stderr'] is None: + + if curr_info['error'] is None: + curr_info['error'] = data.get('StdErr') + + ret, out, err = self.js.shell.run_sync( + 'cat %s' % curr_info['error']) + if ret: curr_info['stderr'] = None + else : curr_info['stderr'] = out + + self.js._handle_file_transfers(curr_info['ft'], mode='out') + + curr_info['gone'] = True + + self.js.jobs[self._id] = curr_info + return curr_info @@ -1132,6 +1225,30 @@ def get_state(self): return self._state + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stdout(self): + + out = self._job_get_info()['stdout'] + if out is None: + out = '' + # raise rse.NoSuccess("Couldn't fetch stdout (js reconnected?)") + return out + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stderr(self): + + err = self._job_get_info()['stderr'] + if err is None: + err = '' + # raise rse.NoSuccess("Couldn't fetch stderr (js reconnected?)") + return err + + # -------------------------------------------------------------------------- # @SYNC_CALL @@ -1165,6 +1282,7 @@ def wait(self, timeout): raise rse.IncorrectState("cannot get job state") if state in c.FINAL: + self._job_get_info() return True # check if we hit timeout diff --git a/src/radical/saga/configs/registry_default.json b/src/radical/saga/configs/registry_default.json index 2b4deab35..6c50079fa 100644 --- a/src/radical/saga/configs/registry_default.json +++ b/src/radical/saga/configs/registry_default.json @@ -1,12 +1,13 @@ { "adaptor_registry" : [ - # this should be auto-generated by the registry, + # this should be auto-generated by the registry, # based on the adaptor search path "radical.saga.adaptors.context.myproxy", "radical.saga.adaptors.context.x509", "radical.saga.adaptors.context.ssh", "radical.saga.adaptors.context.userpass", + "radical.saga.adaptors.noop.noop_job", "radical.saga.adaptors.shell.shell_job", "radical.saga.adaptors.shell.shell_file", "radical.saga.adaptors.shell.shell_resource", diff --git a/src/radical/saga/session.py b/src/radical/saga/session.py index 7857c0f02..44f6fa8b1 100644 --- a/src/radical/saga/session.py +++ b/src/radical/saga/session.py @@ -275,12 +275,12 @@ def __init__ (self, uid=None): _engine = engine.Engine() - if not 'saga.Context' in _engine._adaptor_registry : + if 'radical.saga.Context' not in _engine._adaptor_registry : self._logger.warning ("no context adaptors found") return - for schema in _engine._adaptor_registry['saga.Context'] : - for info in _engine._adaptor_registry['saga.Context'][schema] : + for schema in _engine._adaptor_registry['radical.saga.Context'] : + for info in _engine._adaptor_registry['radical.saga.Context'][schema] : default_ctxs = [] diff --git a/src/radical/saga/utils/job/transfer_directives.py b/src/radical/saga/utils/job/transfer_directives.py index 89d1d199c..b17001f7f 100644 --- a/src/radical/saga/utils/job/transfer_directives.py +++ b/src/radical/saga/utils/job/transfer_directives.py @@ -46,7 +46,7 @@ def __init__(self, directives=None): self._out_append = list() if not directives: - directives = {} + directives = [] for d in directives: diff --git a/src/radical/saga/utils/pty_shell.py b/src/radical/saga/utils/pty_shell.py index d9a910040..247fafc79 100644 --- a/src/radical/saga/utils/pty_shell.py +++ b/src/radical/saga/utils/pty_shell.py @@ -938,8 +938,8 @@ def run_copy_to (self, src, tgt, cp_flags=None) : info = self.pty_info repl = dict (list({'src' : src, - 'tgt' : tgt, - 'cp_flags' : cp_flags + 'tgt' : tgt, + 'cp_flags' : cp_flags }.items ()) + list(info.items ())) # at this point, we do have a valid, living master diff --git a/tests/unittests/api/filesystem/test_file.py b/tests/unittests/api/filesystem/test_file.py index 0517dc7ba..a8aa652f4 100644 --- a/tests/unittests/api/filesystem/test_file.py +++ b/tests/unittests/api/filesystem/test_file.py @@ -55,10 +55,10 @@ def test_nonexisting_host_file_open(self): try: tc = config() invalid_url = deepcopy(rs.Url(tc.filesystem_url)) - invalid_url.host = "does.not.exist" + invalid_url.host = "does/not/exist" _ = rs.filesystem.File(invalid_url) assert False, "Expected BadParameter exception but got none." - except rs.BadParameter: + except rs.DoesNotExist: assert True except rs.SagaException as ex: assert False, "Expected BadParameter exception, but got %s" % ex diff --git a/tests/unittests/api/job/test_job.py b/tests/unittests/api/job/test_job.py index 7b28c1d2c..302851070 100755 --- a/tests/unittests/api/job/test_job.py +++ b/tests/unittests/api/job/test_job.py @@ -98,8 +98,9 @@ def test_job_service_invalid_url(self): tmp_js = None try: - invalid_url = rs.Url(self.cfg['job_service_url']) - invalid_url.host = "does.not.exist" + invalid_url = rs.Url(self.cfg['job_service_url']) + invalid_url.schema = "ssh" + invalid_url.host = "does.not.exist" tmp_js = rs.job.Service(invalid_url, self.session) assert False, "Expected BadParameter exception but got none." diff --git a/tests/unittests/utils/test_pty_shell.py b/tests/unittests/utils/test_pty_shell.py index 0941633ef..a507cfdbb 100755 --- a/tests/unittests/utils/test_pty_shell.py +++ b/tests/unittests/utils/test_pty_shell.py @@ -126,10 +126,10 @@ def test_ptyshell_file_stage () : if __name__ == '__main__': test_ptyshell_ok() - test_ptyshell_nok() - test_ptyshell_async() - test_ptyshell_prompt() - test_ptyshell_file_stage() + # test_ptyshell_nok() + # test_ptyshell_async() + # test_ptyshell_prompt() + # test_ptyshell_file_stage() # ------------------------------------------------------------------------------