From 105be63c8492e564382e7d865c0d59cf11f9c5d4 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 7 Jan 2020 11:17:22 +0100 Subject: [PATCH 01/11] example cleanup, addition --- examples/jobs/localjob.py | 8 +- examples/jobs/slurmjob.py | 10 +- examples/jobs/slurmjobcontainer.py | 117 +++++++++++++++++++ src/radical/saga/adaptors/slurm/slurm_job.py | 24 ++-- 4 files changed, 139 insertions(+), 20 deletions(-) create mode 100755 examples/jobs/slurmjobcontainer.py diff --git a/examples/jobs/localjob.py b/examples/jobs/localjob.py index d312d6d61..551866513 100755 --- a/examples/jobs/localjob.py +++ b/examples/jobs/localjob.py @@ -5,10 +5,12 @@ __license__ = "MIT" -""" This examples shows how to run a job on the local machine - using the 'local' job adaptor. +""" +This examples shows how to run a job on the local machine +using the 'local' job adaptor. """ +import os import sys import radical.saga as saga @@ -30,7 +32,7 @@ def main(): jd.executable = '/usr/bin/touch' jd.arguments = ['$FILENAME'] - jd.working_directory = "/tmp/A/B/C" + jd.working_directory = "/tmp/%d/A/B/C" % os.getuid() jd.output = "examplejob.out" jd.error = "examplejob.err" diff --git a/examples/jobs/slurmjob.py b/examples/jobs/slurmjob.py index 7697589ea..de6456dfa 100755 --- a/examples/jobs/slurmjob.py +++ b/examples/jobs/slurmjob.py @@ -17,7 +17,7 @@ import radical.saga as rs -js_url = "slurm+gsissh://stampede2.tacc.xsede.org:2222/" +js_url = "slurm://localhost/" # ------------------------------------------------------------------------------ @@ -40,8 +40,8 @@ def start(): jd.arguments = ['$FILENAME'] jd.name = "examplejob" - jd.queue = "normal" - jd.project = "TG-MCB090174" + # jd.queue = "normal" + # jd.project = "TG-MCB090174" jd.working_directory = ".saga/test" jd.output = "examplejob.out" @@ -52,13 +52,13 @@ def start(): job = js.create_job(jd) # Check our job's id and state - print("Job ID : %s" % (job.id)) - print("Job State : %s" % (job.state)) + print("Job State : %s" % (job.state)) # Now we can start our job. print("starting job") job.run() + print("Job ID : %s" % (job.id)) print("Job State : %s" % job.state) print("Exitcode : %s" % job.exit_code) print("Exec. hosts : %s" % job.execution_hosts) diff --git a/examples/jobs/slurmjobcontainer.py b/examples/jobs/slurmjobcontainer.py new file mode 100755 index 000000000..beac4c39e --- /dev/null +++ b/examples/jobs/slurmjobcontainer.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +__author__ = 'RADICAL @ Rutgers' +__copyright__ = 'Copyright 2012-2013, The SAGA Project' +__license__ = 'MIT' + + +''' +This examples shows how to run groups of jobs using the 'local' file adaptor. +This example uses job containers for simplified and optimized bulk job handling. + +Job container can be used to easily model dependencies between groups of +different jobs, e.g., in workflow scenarios. In this example, we execute +'num_job_groups' containers of jobs_per_group' number of parallel jobs +sequentially:: + + C1[j1,j2,j3,j4,...] -> C2[j1,j2,j3,j4,...] -> C3[j1,j2,j3,j4,...] -> ... + +Depending on the adaptor implementation, using job containers can be quite +advantageous in terms of call latency. Some adaptors implement special bulk +operations for container management, which makes them generally much faster than +iterating over and operating on individual jobs. +''' + +import sys +import random + +import radical.saga as rs + + +URL = 'slurm://locahost/' + + +# ------------------------------------------------------------------------------ +def main(): + + # number of job 'groups' (containers) and of jobs per group + num_job_groups = 10 + jobs_per_group = 2 # check slurm limits! + + current = None + + try: + # all jobs in this example are running on the same job service + service = rs.job.Service(URL) + + # create and populate our containers + containers = list() + for c in range(0, num_job_groups): + + # create containers + containers.append(rs.job.Container()) + + # add jobs to container. + for j in range(0, jobs_per_group): + jd = rs.job.Description() + jd.executable = '/bin/sleep' + jd.arguments = ['10'] + jd.name = ['job.%02d.%03d' % (c, j)] + j = service.create_job(jd) + containers[c].add(j) + + # execute the containers sequentially + for c in range(0, num_job_groups): + print('run container %s ' % c) + containers[c].run() + + current = containers[c] # see exception handling + + for j in containers[c].get_tasks(): + print('%s: %s: %s' % (j.name, j.id, j.state)) + + print(containers[c].get_states ()) + containers[c].wait() + print(containers[c].get_states ()) + + # all jobs in the container finished - print some job infos + for job in containers[c].jobs: + print(' * %s: %s (%s) @%s %s - %s' \ + % (job.id, job.state, job.exit_code, job.execution_hosts, + job.started, job.finished)) + + print() + + service.close() + return 0 + + except rs.SagaException as ex: + + print('An exception occured: %s ' % ((str(ex)))) + # get the whole traceback in case of an exception - + # this can be helpful for debugging the problem + print(' *** %s' % ex.traceback) + return -1 + + + finally: + + # make sure we leave no jobs behind + if current is not None: + print('\ncancel current container: %s' % current) + current.cancel() + for job in current.jobs: + print(' * %s: %s (%s) @%s %s - %s' \ + % (job.id, job.state, job.exit_code, job.execution_hosts, + job.started, job.finished)) + print() + + +# ------------------------------------------------------------------------------ +if __name__ == '__main__': + + sys.exit(main()) + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/saga/adaptors/slurm/slurm_job.py b/src/radical/saga/adaptors/slurm/slurm_job.py index 2bc4208f2..888030250 100644 --- a/src/radical/saga/adaptors/slurm/slurm_job.py +++ b/src/radical/saga/adaptors/slurm/slurm_job.py @@ -604,30 +604,30 @@ def _job_run(self, jd): # find out what our job ID is # TODO: Could make this more efficient - self.job_id = None + job_id = None for line in out.split("\n"): if "Submitted batch job" in line: - self.job_id = "[%s]-[%s]" % (self.rm, int(line.split()[-1:][0])) + job_id = "[%s]-[%s]" % (self.rm, int(line.split()[-1:][0])) break # if we have no job ID, there's a failure... - if not self.job_id: + if not job_id: raise rse.NoSuccess._log(self._logger, "Couldn't get job id from submitted job!" " sbatch output:\n%s" % out) - self._logger.debug("started job %s" % self.job_id) + self._logger.debug("started job %s" % job_id) self._logger.debug("Batch system output:\n%s" % out) # create local jobs dictionary entry - self.jobs[self.job_id] = {'state' : c.PENDING, - 'create_time': None, - 'start_time' : None, - 'end_time' : None, - 'comp_time' : None, - 'exec_hosts' : None, - 'gone' : False} - return self.job_id + self.jobs[job_id] = {'state' : c.PENDING, + 'create_time': None, + 'start_time' : None, + 'end_time' : None, + 'comp_time' : None, + 'exec_hosts' : None, + 'gone' : False} + return job_id # -------------------------------------------------------------------------- From d74cd6193496be71099580ab56b382e1b77deb9e Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 15 Jan 2020 10:43:19 +0100 Subject: [PATCH 02/11] add noop job adaptor for tests --- src/radical/saga/adaptors/noop/__init__.py | 0 src/radical/saga/adaptors/noop/noop_job.py | 683 ++++++++++++++++++ .../saga/configs/registry_default.json | 3 +- 3 files changed, 685 insertions(+), 1 deletion(-) create mode 100644 src/radical/saga/adaptors/noop/__init__.py create mode 100644 src/radical/saga/adaptors/noop/noop_job.py diff --git a/src/radical/saga/adaptors/noop/__init__.py b/src/radical/saga/adaptors/noop/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/radical/saga/adaptors/noop/noop_job.py b/src/radical/saga/adaptors/noop/noop_job.py new file mode 100644 index 000000000..6383ae167 --- /dev/null +++ b/src/radical/saga/adaptors/noop/noop_job.py @@ -0,0 +1,683 @@ + +__author__ = "Andre Merzky" +__copyright__ = "Copyright 2020, The SAGA Project" +__license__ = "MIT" + + +''' noop based job adaptor implementation ''' + +import re +import time +import threading + +from ... import exceptions as rse +from .. import base +from ..cpi import SYNC_CALL +from ..cpi import job as cpi +from ... import job as api + + +# ------------------------------------------------------------------------------ +# +class _job_state_monitor(threading.Thread): + + # -------------------------------------------------------------------------- + # + def __init__(self, log): + + self._log = log + self._lock = threading.Lock() + self._term = threading.Event() + self._jobs = dict() + self._cnt = 0 + + super(_job_state_monitor, self).__init__() + + self.setDaemon(True) + + + # -------------------------------------------------------------------------- + # + def stop(self): + + self._term.set() + + + # -------------------------------------------------------------------------- + # + def add_job(self, job): + + job._id = 'job.%06d' % self._cnt + self._cnt += 1 + + assert(job._id not in self._jobs) + job._set_state(api.RUNNING) + job._started = time.time() + + with self._lock: + self._jobs[job._id] = job + + + # -------------------------------------------------------------------------- + # + def get_job(self, jid): + + assert(jid in self._jobs) + with self._lock: + return self._jobs[jid] + + + # -------------------------------------------------------------------------- + # + def list_jobs(self): + + with self._lock: + return list(self._jobs.keys()) + + + # -------------------------------------------------------------------------- + # + def run(self): + + try: + + while not self._term.is_set(): + + now = time.time() + keep = dict() + + with self._lock: + + for job in self._jobs.values(): + + if job.get_state() == api.CANCELED: + continue + + if job.tgt < now: + job._finished = now + job._exit_code = 0 + job._set_state(api.DONE) + + else: + keep[job._id] = job + + self._jobs = keep + + time.sleep(0.1) + + except Exception: + + self._log.exception("Exception in job monitoring thread") + + +# ------------------------------------------------------------------------------ +# the adaptor name +# +_ADAPTOR_NAME = "radical.saga.adaptors.noop_job" +_ADAPTOR_SCHEMAS = ["noop"] + + + +# ------------------------------------------------------------------------------ +# the adaptor capabilities & supported attributes +# +_ADAPTOR_CAPABILITIES = { + "jdes_attributes" : [api.NAME, + api.EXECUTABLE, + api.PRE_EXEC, + api.POST_EXEC, + api.ARGUMENTS, + api.ENVIRONMENT, + api.WORKING_DIRECTORY, + api.FILE_TRANSFER, + api.INPUT, + api.OUTPUT, + api.ERROR, + api.NAME, + api.WALL_TIME_LIMIT, + api.TOTAL_CPU_COUNT, + api.TOTAL_GPU_COUNT, + api.PROCESSES_PER_HOST, + api.SPMD_VARIATION, + ], + "job_attributes" : [api.EXIT_CODE, + api.EXECUTION_HOSTS, + api.CREATED, + api.STARTED, + api.FINISHED], + "metrics" : [api.STATE, + api.STATE_DETAIL], + "contexts" : {} +} + +# ------------------------------------------------------------------------------ +# the adaptor documentation +# +_ADAPTOR_DOC = { + "name" : _ADAPTOR_NAME, + "capabilities" : _ADAPTOR_CAPABILITIES, + "description" : ''' + The Noop job adaptor, which fakes job execution and is used for testing + and benchmarking purposes.''', + "example" : "examples/jobs/noopjob.py", + "schemas" : {"noop" : "fake job execution"} +} + +# ------------------------------------------------------------------------------ +# the adaptor info is used to register the adaptor with SAGA + +_ADAPTOR_INFO = { + "name" : _ADAPTOR_NAME, + "version" : "v0.1", + "schemas" : _ADAPTOR_SCHEMAS, + "capabilities" : _ADAPTOR_CAPABILITIES, + "cpis" : [{ + "type" : "radical.saga.job.Service", + "class": "NoopJobService" + },{ + "type" : "radical.saga.job.Job", + "class": "NoopJob" + }] +} + + +# ------------------------------------------------------------------------------ +# The adaptor class +class Adaptor(base.Base): + ''' + This is the actual adaptor class, which gets loaded by SAGA (i.e. by the + SAGA engine), and which registers the CPI implementation classes which + provide the adaptor's functionality. + ''' + + + # -------------------------------------------------------------------------- + # + def __init__(self): + + base.Base.__init__(self, _ADAPTOR_INFO, expand_env=False) + + + # -------------------------------------------------------------------------- + # + def sanity_check(self): + + pass + + +# ------------------------------------------------------------------------------ +# +class NoopJobService(cpi.Service): + + # -------------------------------------------------------------------------- + # + def __init__(self, api, adaptor): + + _cpi_base = super (NoopJobService, self) + _cpi_base.__init__(api, adaptor) + + + # -------------------------------------------------------------------------- + # + def __del__(self): + + try : self.close() + except: pass + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def init_instance(self, adaptor_state, rm_url, session): + ''' + Service instance constructor + ''' + + self.rm = rm_url + self.session = session + self.jobs = dict() + + # Use `_set_session` method of the base class to set the session object. + # `_set_session` and `get_session` methods are provided by `CPIBase`. + self._set_session(session) + + # the monitoring thread - one per service instance. + self.monitor = _job_state_monitor(self._logger) + self.monitor.start() + + return self.get_api() + + + # -------------------------------------------------------------------------- + # + def close(self): + + if self.monitor: + self.monitor.stop() + + + # -------------------------------------------------------------------------- + # + # + def _job_run(self, job): + ''' + runs a job on the wrapper via pty, and returns the job id + ''' + + if not job.jd.executable.endswith('sleep'): + raise ValueError('expected "sleep", not %s' % job.jd.executable) + + + if len(job.jd.arguments) != 1: + raise ValueError('expected int argument, not %s' % job.jd.arguments) + + job.tgt = time.time() + int(job.jd.arguments[0]) + self.monitor.add_job(job) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def run_job(self, cmd, host): + ''' + Implements adaptors.cpi.job.Service.run_job() + ''' + + if not cmd: + raise rse.BadParameter._log(self._logger, + "run_job needs a command to run") + + if host and host != self.rm.host: + raise rse.BadParameter._log(self._logger, + "Can only run jobs on %s, not on %s" % (self.rm.host, host)) + + jd = api.Description() + + exe, arg = cmd.split() + + jd.executable = exe + jd.arguments = [arg] + + job = self.create_job(jd) + job.run() + + return job + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def create_job(self, jd): + + # this dict is passed on to the job adaptor class -- use it to pass any + # state information you need there. + adaptor_state = {"job_service" : self, + "job_description": jd, + "job_schema" : self.rm.schema } + + return api.Job(_adaptor=self._adaptor, _adaptor_state=adaptor_state) + + + # -------------------------------------------------------------------------- + @SYNC_CALL + def get_url(self): + + return self.rm + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def list(self): + + return self.monitor.list_jobs() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_job(self, job_id, no_reconnect=False): + + return self.monitor.get_job(job_id) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_run(self, jobs): + ''' + From all the job descriptions in the container, build a bulk, and submit + as async. The read whaterver the wrapper returns, and sort through the + messages, assigning job IDs etc. + ''' + + self._logger.debug("container run: %s" % str(jobs)) + + for job in jobs: + job.run() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_wait(self, jobs, mode, timeout): + + for job in jobs: + + job.wait() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_cancel(self, jobs, timeout): + + for job in jobs: + + job.cancel() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def container_get_states(self, jobs): + + return [job.get_state() for job in jobs] + + +# ------------------------------------------------------------------------------ +# +class NoopJob(cpi.Job): + + # -------------------------------------------------------------------------- + # + def __init__(self, api, adaptor): + + _cpi_base = super (NoopJob, self) + _cpi_base.__init__(api, adaptor) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def init_instance(self, job_info): + + if 'job_description' in job_info: + # comes from job.service.create_job() + self.js = job_info["job_service"] + self.jd = job_info["job_description"] + + # initialize job attribute values + self._id = None + self._name = self.jd.get(api.NAME) + self._log = list() + self._state = None + self._exit_code = None + self._exception = None + self._created = time.time() + self._name = self.jd.name + self._started = None + self._finished = None + + self._set_state(api.NEW) + + elif 'job_id' in job_info: + # initialize job attribute values + self.js = job_info["job_service"] + self.jd = None + self._id = job_info['job_id'] + self._name = job_info.get('job_name') + self._log = list() + self._state = None + self._exit_code = None + self._exception = None + self._created = None + self._name = None + self._started = None + self._finished = None + + else: + # don't know what to do... + raise rse.BadParameter("insufficient info for job creation") + + if self._created: self._created = float(self._created) + + return self.get_api() + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_description(self): + + return self.jd + + + # -------------------------------------------------------------------------- + # + def _update_state(self, state): + + # update state, and report to application + self._state = state + self._api()._attributes_i_set('state', self._state, self._api()._UP) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_state(self): + ''' + Implements adaptors.cpi.job.Job.get_state() + ''' + + # may not yet have backend representation, state is 'NEW' + if self._id is None: + return self._state + + return self._state + + + # -------------------------------------------------------------------------- + # + def _set_state(self, state): + + old_state = self._state + + # on state changes, trigger notifications + if old_state != state: + self._state = state + self._api()._attributes_i_set('state', state, self._api()._UP) + + return self._state + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_created(self): + + return self._created + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_started(self): + + return self._started + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_finished(self): + + return self._finished + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stdout(self): + + return '' + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stderr(self): + + return '' + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_log(self): + + return '' + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_service_url(self): + + if not self.js: + raise rse.IncorrectState("Job Service URL unknown") + else: + return self.js.get_url() + + + # -------------------------------------------------------------------------- + # + # TODO: this should also fetch the(final) state, to safe a hop + # TODO: implement via notifications + # + @SYNC_CALL + def wait(self, timeout): + ''' + A call to the noop to do the WAIT would block the noop for any + other interactions. In particular, it would practically kill it if the + Wait waits forever... + + So we implement the wait via a state pull. The *real* solution is, of + course, to implement state notifications, and wait for such + a notification to arrive within timeout seconds... + ''' + + time_start = time.time() + time_now = time_start + + while True: + + state = self.get_state() + + if state in [api.DONE, api.FAILED, api.CANCELED]: + return True + + # avoid busy poll + # FIXME: self-tune by checking call latency + time.sleep(0.1) + + # check if we hit timeout + if timeout >= 0: + time_now = time.time() + if time_now - time_start > timeout: + return False + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_id(self): + + return self._id + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_name(self): + + return self._name + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_exit_code(self): + + return self._exit_code + + + # -------------------------------------------------------------------------- + # + # TODO: the values below should be fetched with every get_state... + # + @SYNC_CALL + def get_execution_hosts(self): + + return [self.js.get_url().host] + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def run(self): + + self.js._job_run(self) + self.js.jobs[self._id] = self._api() + + self._set_state(api.RUNNING) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def suspend(self): + + if self.get_state() != api.RUNNING: + raise rse.IncorrectState("Cannot suspend, job is not RUNNING") + + self._old_state = self.get_state() + self._adaptor._update_state(api.SUSPENDED) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def resume(self): + + if self.get_state() != api.SUSPENDED: + raise rse.IncorrectState("Cannot resume, job is not SUSPENDED") + + self._adaptor._update_state(self._old_state) + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def cancel(self, timeout): + + if self.get_state() not in [api.RUNNING, api.SUSPENDED, + api.CANCELED, api.DONE, + api.FAILED]: + raise rse.IncorrectState("Cannot cancel, job is not running") + + if self._state in [api.CANCELED, api.DONE, api.FAILED]: + self._set_state(api.CANCELED) + return + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def re_raise(self): + # nothing to do here actually, as run() is synchronous... + return self._exception + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/saga/configs/registry_default.json b/src/radical/saga/configs/registry_default.json index 2b4deab35..6c50079fa 100644 --- a/src/radical/saga/configs/registry_default.json +++ b/src/radical/saga/configs/registry_default.json @@ -1,12 +1,13 @@ { "adaptor_registry" : [ - # this should be auto-generated by the registry, + # this should be auto-generated by the registry, # based on the adaptor search path "radical.saga.adaptors.context.myproxy", "radical.saga.adaptors.context.x509", "radical.saga.adaptors.context.ssh", "radical.saga.adaptors.context.userpass", + "radical.saga.adaptors.noop.noop_job", "radical.saga.adaptors.shell.shell_job", "radical.saga.adaptors.shell.shell_file", "radical.saga.adaptors.shell.shell_resource", From 0ecdb4565b4393eb7eb3b4c1fb5d532dff90b48f Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 22 Jan 2020 19:18:37 +0100 Subject: [PATCH 03/11] doc snapshot --- docs/source/_themes/armstrong/theme.conf | 6 ++---- src/radical/saga/session.py | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/docs/source/_themes/armstrong/theme.conf b/docs/source/_themes/armstrong/theme.conf index 450780c8a..448d201c0 100644 --- a/docs/source/_themes/armstrong/theme.conf +++ b/docs/source/_themes/armstrong/theme.conf @@ -25,9 +25,11 @@ black = #111111 light_color = #C5D2DD light_medium_color = #DDEAF0 medium_color = #6588A5 +medium_color_hover = rgba(255, 255, 255, 0.25) medium_color_link = #634320 medium_color_link_hover = #261a0c dark_color = #293947 +green_highlight = #8ecc4c h1 = #5a5a5a h2 = #5a5a5a @@ -36,10 +38,6 @@ h3 = #5a5a5a link_color = #0088cc link_color_decoration = #0088cc -medium_color_hover = rgba(255, 255, 255, 0.25) -medium_color = rgba(255, 255, 255, 0.5) -green_highlight = #8ecc4c - positive_dark = rgba(51, 77, 0, 1.0) positive_medium = rgba(102, 153, 0, 1.0) diff --git a/src/radical/saga/session.py b/src/radical/saga/session.py index 7857c0f02..44f6fa8b1 100644 --- a/src/radical/saga/session.py +++ b/src/radical/saga/session.py @@ -275,12 +275,12 @@ def __init__ (self, uid=None): _engine = engine.Engine() - if not 'saga.Context' in _engine._adaptor_registry : + if 'radical.saga.Context' not in _engine._adaptor_registry : self._logger.warning ("no context adaptors found") return - for schema in _engine._adaptor_registry['saga.Context'] : - for info in _engine._adaptor_registry['saga.Context'][schema] : + for schema in _engine._adaptor_registry['radical.saga.Context'] : + for info in _engine._adaptor_registry['radical.saga.Context'][schema] : default_ctxs = [] From f2208a0e0d10c45508eeeba410b55e1e0b3d0165 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 22 Jan 2020 19:33:07 +0100 Subject: [PATCH 04/11] doc snapshot --- docs/source/usage/config.rst | 19 ++++--- docs/source/usage/install.rst | 17 +++--- docs/source/usage/logger.rst | 97 +++++++++++++---------------------- 3 files changed, 57 insertions(+), 76 deletions(-) diff --git a/docs/source/usage/config.rst b/docs/source/usage/config.rst index c1261e24f..9b0417381 100644 --- a/docs/source/usage/config.rst +++ b/docs/source/usage/config.rst @@ -5,8 +5,13 @@ Configuration .. note:: - SAGA has been designed as a zero-configuration library. Unless you are - experiencing problems with one of the default configuration settings, there's + This section is outdated! + + +.. note:: + + SAGA has been designed as a zero-configuration library. Unless you are + experiencing problems with one of the default configuration settings, there's really no need to create a configuration file for SAGA. SAGA and its individual middleware adaptors provide various optional @@ -22,8 +27,8 @@ Configuration Files ------------------- If you need to make persistent changes to any of SAGA's :ref:`conf_options`, the -simplest option is to create a configuration file. During startup, SAGA checks -for the existence of a configuration file in `$HOME/.saga.conf`. If that +simplest option is to create a configuration file. During startup, SAGA checks +for the existence of a configuration file in `$HOME/.saga.conf`. If that configuration file is found, it is parsed by SAGA's configuration system. SAGA configuration files use a structure that looks like this:: @@ -32,7 +37,7 @@ SAGA configuration files use a structure that looks like this:: [saga.logger] option = value - + [saga.adaptor.name] option = value @@ -47,10 +52,10 @@ Module saga.utils.config The config module provides classes and functions to introspect and modify SAGA's configuration. The :func:`getConfig` function is used to get the -:class:`GlobalConfig` object which represents the current configuration +:class:`GlobalConfig` object which represents the current configuration of SAGA:: - from saga.utils.config import getConfig + from saga.utils.config import getConfig sagaconf = getConfig() print sagaconf.get_category('saga.utils.logger') diff --git a/docs/source/usage/install.rst b/docs/source/usage/install.rst index 122044beb..3c9e4f144 100644 --- a/docs/source/usage/install.rst +++ b/docs/source/usage/install.rst @@ -9,14 +9,14 @@ Requirements radical.saga has the following requirements: -* Python 2.7 or newer +* Python 3.5 or newer Installation via PyPi --------------------- -radical.saga is available for download via PyPI and may be installed using -easy_install or pip (preferred). Both automatically download and install all +radical.saga is available for download via PyPI and may be installed using +easy_install or pip (preferred). Both automatically download and install all dependencies required by radical.saga if they can't be found on your system: .. code-block:: bash @@ -28,20 +28,20 @@ dependencies required by radical.saga if they can't be found on your system: Using Virtualenv ---------------- -If you don't want to (or can't) install RADICAL-SAGA into your system's Python -environment, there's a simple (and often preferred) way to create an -alternative Python environment (e.g., in your home directory): +If you don't want to (or can't) install RADICAL-SAGA into your system's Python +environment, there's a simple (and often preferred) way to create an +alternative Python environment (e.g., in your home directory): .. code-block:: bash virtualenv --no-site-package $HOME/sagaenv/ . $HOME/sagaenv/bin/activate - pip install radical.saga + pip install radical.saga **What if my system Doesn't come With virtualenv, pip or easy_install?** -There's a simple workaround for that using the 'instant' version of virtualenv. +There's a simple workaround for that using the 'instant' version of virtualenv. It also installs easy_install and pip: .. code-block:: bash @@ -62,3 +62,4 @@ You can install the latest development version of RADICAL-SAGA directly from our .. code-block:: bash pip install -e git://github.com/radical-cybertools/radical.saga.git@devel#egg=radical.saga + diff --git a/docs/source/usage/logger.rst b/docs/source/usage/logger.rst index 1eee4768c..71358a78f 100644 --- a/docs/source/usage/logger.rst +++ b/docs/source/usage/logger.rst @@ -7,7 +7,7 @@ In a distributed environment unified error logging and reporting is a crucial capability for debugging and monitoring. SAGA has a configurable logging system that captures debug, info, warning and error messages across all of its middelware adaptors. The logging system can be controlled in two different -ways: via :ref:`env_vars` variables, which should be sufficient in most +ways: via :ref:`env_vars` variables, which should be sufficient in most scenarios, and via the :ref:`log_api`, which provides programmatic access to the logging system for advanced use-cases. @@ -17,101 +17,76 @@ the logging system for advanced use-cases. Environment Variables --------------------- -Several environment variables can be used to control SAGA's logging behavior from +Several environment variables can be used to control SAGA's logging behavior from the command line. Obviously, this can come in handy when debugging a problem -with an existing SAGA application. Environment variables are set in the -executing shell and evaluated by SAGA at program startup. +with an existing SAGA application. Environment variables are set in the +executing shell and evaluated by SAGA at program startup. -.. envvar:: SAGA_VERBOSE +.. envvar:: RADICAL_SAGA_LOG_LVL Controls the log level. This controls the amount of output generated by the - logging system. ``SAGA_VERBOSE`` expects either a numeric (0-4) value or a + logging system. ``SAGA_VERBOSE`` expects either a numeric (0-4) value or a string (case insensitive) representing the log level: +---------------+---------------------+------------------------------------+ | Numeric Value | Log Level | Type of Messages Displayed | +===============+=====================+====================================+ | 0 | ``CRITICAL`` | Only fatal events that will cause | - | (default) | | SAGA to abort. | + | (default) | | SAGA to abort. | | | | | +---------------+---------------------+------------------------------------+ | 1 | ``ERROR`` | Errors that will not necessarily | - | | | cause SAGA to abort. | + | | | cause SAGA to abort. | | | | | +---------------+---------------------+------------------------------------+ | 2 | ``WARNING`` | Warnings that are generated by | - | | | SAGA and its middleware adaptors. | + | | | SAGA and its middleware adaptors. | | | | | +---------------+---------------------+------------------------------------+ - | 3 | ``INFO`` | Useful (?) runtime information | - | | | that is generated by SAGA and its | + | 3 | ``INFO`` | Useful (?) runtime information | + | | | that is generated by SAGA and its | | | | middleware adaptors. | +---------------+---------------------+------------------------------------+ | 4 | ``DEBUG`` | Debug message added to the code | - | | | by the developers. (Lots of output)| + | | | by the developers. (Lots of output)| | | | | +---------------+---------------------+------------------------------------+ - For example, if you want to see the debug messages that SAGA generates during - program execution, you would set :envvar:`SAGA_VERBOSE` to ``DEBUG`` before + For example, if you want to see the debug messages that SAGA generates during + program execution, you would set :envvar:`SAGA_VERBOSE` to ``DEBUG`` before you run your program:: - SAGA_VERBOSE=DEBUG python mysagaprog.py + RADICAL_SAGA_LOG_LVL=DEBUG python mysagaprog.py -.. envvar:: SAGA_LOG_FILTERS +# .. envvar:: RADICAL_SAGA_LOG_LVL +# +# Controls the message sources displayed. SAGA uses a +# hierarchal structure for its log sources. Starting with the root logger +# ``saga``, several sub loggers are defined for SAGA-internal logging events +# (``saga.engine``) and individual middleware adaptors ``saga.adaptor.name``. +# ``SAGA_LOG_FILTERS`` expects either a single source name or a +# comma-separated list of source names. Non-existing source names are ignored. +# +# For example, if you want to see only the debug messages generated by +# ``saga.engine`` and a specific middleware adaptor called ``xyz`` you would +# set the following environment variables:: +# +# SAGA_VERBOSE=DEBUG SAGA_LOG_FILTERS=saga.engine,saga.adaptor.xyz python mysagaprog.py - Controls the message sources displayed. SAGA uses a - hierarchal structure for its log sources. Starting with the root logger - ``saga``, several sub loggers are defined for SAGA-internal logging events - (``saga.engine``) and individual middleware adaptors ``saga.adaptor.name``. - ``SAGA_LOG_FILTERS`` expects either a single source name or a - comma-separated list of source names. Non-existing source names are ignored. - For example, if you want to see only the debug messages generated by - ``saga.engine`` and a specific middleware adaptor called ``xyz`` you would - set the following environment variables:: +.. envvar:: RADICAL_SAGA_LOG_TGT - SAGA_VERBOSE=DEBUG SAGA_LOG_FILTERS=saga.engine,saga.adaptor.xyz python mysagaprog.py - - -.. envvar:: SAGA_LOG_TARGETS - - Controls where the log messages go. Multiple concurrent locations are - supported. - ``SAGA_LOG_TARGETS`` expects either a single location or a comma-separated + Controls where the log messages go. Multiple concurrent locations are + supported. + ``SAGA_LOG_TARGETS`` expects either a single location or a comma-separated list of locations, where a location can either be a path/filename or - the ``STDOUT`` keyword (case insensitive) for logging to the console. + the ``stdout``/``stderr`` keyword (case sensitive) for logging to the console. For example, if you want to see debug messages on the console but also - want to log them in a file for further analysis, you would set the the + want to log them in a file for further analysis, you would set the the following environment variables:: - SAGA_VERBOSE=DEBUG SAGA_LOG_TARGETS=STDOUT,/tmp/mysaga.log python mysagaprog.py - - -.. _log_api: - -Application Level Logging -------------------------- - -The RADICAL-SAGA logging utilities are a thin wrapper around Python's logging -facilities, integrated into the RADICAL-SAGA configuration facilities. -To support the seamless integration of application level logging needs, the -:func:`saga.utils.logger.getLogger` allows to produce additional logger -facilities, which are again native Python :class:`logging.Logger` instances, but -preconfigured according to the RADICAL-SAGA logging configuration. -Those instances can then be further customized as needed:: - - from saga.utils.logger import getLogger, INFO - - app_logger = getLogger ('application.test') - app_logger.level = INFO - - app_logger.info ('application level log message on INFO level') - - -.. automodule:: saga.utils.logger - :members: + SAGA_VERBOSE=DEBUG SAGA_LOG_TARGETS=stdout,./rs.log python mysagaprog.py From e1d1dbe765c76cb9a1e41c54fa80aafdfa765466 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 23 Jan 2020 00:09:10 +0100 Subject: [PATCH 05/11] slurm stdio --- src/radical/saga/adaptors/slurm/slurm_job.py | 64 +++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/src/radical/saga/adaptors/slurm/slurm_job.py b/src/radical/saga/adaptors/slurm/slurm_job.py index 888030250..1b94b01e5 100644 --- a/src/radical/saga/adaptors/slurm/slurm_job.py +++ b/src/radical/saga/adaptors/slurm/slurm_job.py @@ -626,7 +626,12 @@ def _job_run(self, jd): 'end_time' : None, 'comp_time' : None, 'exec_hosts' : None, - 'gone' : False} + 'gone' : False, + 'output' : output, + 'error' : error, + 'stdout' : None, + 'stderr' : None, + } return job_id @@ -954,6 +959,10 @@ def _job_get_info(self): curr_info['comp_time' ] = prev_info.get('comp_time' ) curr_info['exec_hosts' ] = prev_info.get('exec_hosts' ) curr_info['gone' ] = prev_info.get('gone' ) + curr_info['output' ] = prev_info.get('output' ) + curr_info['error' ] = prev_info.get('error' ) + curr_info['stdout' ] = prev_info.get('stdout' ) + curr_info['stderr' ] = prev_info.get('stderr' ) else: curr_info['job_id' ] = None curr_info['job_name' ] = None @@ -964,6 +973,10 @@ def _job_get_info(self): curr_info['comp_time' ] = None curr_info['exec_hosts' ] = None curr_info['gone' ] = None + curr_info['output' ] = None + curr_info['error' ] = None + curr_info['stdout' ] = None + curr_info['stderr' ] = None rm, pid = self._adaptor.parse_id(self._id) @@ -1013,6 +1026,7 @@ def _job_get_info(self): key, val = parts if val in ['', '(null)']: val = None + self._logger.info('=== %s := %s', key, val) data[key] = val if data.get('JobState'): @@ -1042,8 +1056,31 @@ def _job_get_info(self): if not curr_info['start_time' ]: curr_info['start_time' ] = now if curr_info['state'] in c.FINAL: + + self._logger.info('=== %s', data.get('StdErr')) + if not curr_info['end_time' ]: curr_info['end_time' ] = now + if curr_info['stdout'] is None: + + if curr_info['output'] is None: + curr_info['output'] = data.get('StdOut') + + ret, out, err = self.js.shell.run_sync( + 'cat %s' % curr_info['output']) + if ret: curr_info['stdout'] = None + else : curr_info['stdout'] = out + + if curr_info['stderr'] is None: + + if curr_info['error'] is None: + curr_info['error'] = data.get('StdErr') + + ret, out, err = self.js.shell.run_sync( + 'cat %s' % curr_info['error']) + if ret: curr_info['stderr'] = None + else : curr_info['stderr'] = out + return curr_info @@ -1132,6 +1169,30 @@ def get_state(self): return self._state + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stdout(self): + + out = self._job_get_info()['stdout'] + if out is None: + out = '' + # raise rse.NoSuccess("Couldn't fetch stdout (js reconnected?)") + return out + + + # -------------------------------------------------------------------------- + # + @SYNC_CALL + def get_stderr(self): + + err = self._job_get_info()['stderr'] + if err is None: + err = '' + # raise rse.NoSuccess("Couldn't fetch stderr (js reconnected?)") + return err + + # -------------------------------------------------------------------------- # @SYNC_CALL @@ -1165,6 +1226,7 @@ def wait(self, timeout): raise rse.IncorrectState("cannot get job state") if state in c.FINAL: + self._job_get_info() return True # check if we hit timeout From 3769f2222252773351cf93276cf494766589b048 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 23 Jan 2020 00:53:55 +0100 Subject: [PATCH 06/11] slurm staging --- src/radical/saga/adaptors/slurm/slurm_job.py | 64 +++++++++++++++++-- .../saga/utils/job/transfer_directives.py | 2 +- src/radical/saga/utils/pty_shell.py | 4 +- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/src/radical/saga/adaptors/slurm/slurm_job.py b/src/radical/saga/adaptors/slurm/slurm_job.py index 1b94b01e5..90e6fc2cc 100644 --- a/src/radical/saga/adaptors/slurm/slurm_job.py +++ b/src/radical/saga/adaptors/slurm/slurm_job.py @@ -21,10 +21,13 @@ from ...utils import pty_shell as rsups from ... import job as api_job from ... import exceptions as rse +from ... import filesystem as sfs from .. import base as a_base from ..cpi import job as cpi_job from ..cpi import decorators as cpi_decs +from ...utils.job import TransferDirectives + SYNC_CALL = cpi_decs.SYNC_CALL ASYNC_CALL = cpi_decs.ASYNC_CALL @@ -391,7 +394,7 @@ def _open(self): if len(ppn_vals) >= 1: self._ppn = int(ppn_vals[0]) else : self._ppn = None - self._logger.info(" === ppn: %s", self._ppn) + self._logger.info("ppn: %s", self._ppn) # -------------------------------------------------------------------------- @@ -405,6 +408,51 @@ def _close(self): self.shell = None + # -------------------------------------------------------------------------- + # + def _handle_file_transfers(self, ft, mode): + """ + if mode == 'in' : perform sanity checks on all staging directives. + + if mode == 'in' : stage files to condor submission site + if mode == 'out': stage files from condor submission site + """ + + td = TransferDirectives(ft) + + assert(mode in ['in', 'out']) + + if mode == 'in': + + if td.in_append: + raise Exception('File append (>>) not supported') + + if td.out_append: + raise Exception('File append (<<) not supported') + + if td.in_overwrite: + + for (local, remote) in td.in_overwrite: + + source = local + target = remote + + self._logger.info("Transferring in %s to %s", source, target) + self.shell.stage_to_remote(source, target) + + elif mode == 'out': + + if td.out_overwrite: + + for (local, remote) in td.out_overwrite: + + source = remote + target = local + + self._logger.info("Transferring out %s to %s", source, target) + self.shell.stage_from_remote(source, target) + + # -------------------------------------------------------------------------- # # @@ -428,7 +476,7 @@ def _job_run(self, jd): processes_per_host = jd.as_dict().get(c.PROCESSES_PER_HOST) output = jd.as_dict().get(c.OUTPUT, "radical.saga.stdout") error = jd.as_dict().get(c.ERROR, "radical.saga.stderr") - # file_transfer = jd.as_dict().get(c.FILE_TRANSFER) + file_transfer = jd.as_dict().get(c.FILE_TRANSFER) wall_time = jd.as_dict().get(c.WALL_TIME_LIMIT) queue = jd.as_dict().get(c.QUEUE) project = jd.as_dict().get(c.PROJECT) @@ -451,6 +499,7 @@ def _job_run(self, jd): if ret: raise rse.NoSuccess("Couldn't create workdir: %s" % out) + self._handle_file_transfers(file_transfer, mode='in') if isinstance(c_hosts, list): c_hosts = ','.join(c_hosts) @@ -631,6 +680,7 @@ def _job_run(self, jd): 'error' : error, 'stdout' : None, 'stderr' : None, + 'ft' : file_transfer, } return job_id @@ -963,6 +1013,7 @@ def _job_get_info(self): curr_info['error' ] = prev_info.get('error' ) curr_info['stdout' ] = prev_info.get('stdout' ) curr_info['stderr' ] = prev_info.get('stderr' ) + curr_info['ft' ] = prev_info.get('ft' ) else: curr_info['job_id' ] = None curr_info['job_name' ] = None @@ -977,6 +1028,7 @@ def _job_get_info(self): curr_info['error' ] = None curr_info['stdout' ] = None curr_info['stderr' ] = None + curr_info['ft' ] = None rm, pid = self._adaptor.parse_id(self._id) @@ -1014,7 +1066,7 @@ def _job_get_info(self): elems = out.split() data = dict() - for elem in elems: + for elem in sorted(elems): parts = elem.split('=', 1) @@ -1057,8 +1109,6 @@ def _job_get_info(self): if curr_info['state'] in c.FINAL: - self._logger.info('=== %s', data.get('StdErr')) - if not curr_info['end_time' ]: curr_info['end_time' ] = now if curr_info['stdout'] is None: @@ -1081,6 +1131,10 @@ def _job_get_info(self): if ret: curr_info['stderr'] = None else : curr_info['stderr'] = out + self.js._handle_file_transfers(curr_info['ft'], mode='out') + + curr_info['gone'] = True + return curr_info diff --git a/src/radical/saga/utils/job/transfer_directives.py b/src/radical/saga/utils/job/transfer_directives.py index 89d1d199c..b17001f7f 100644 --- a/src/radical/saga/utils/job/transfer_directives.py +++ b/src/radical/saga/utils/job/transfer_directives.py @@ -46,7 +46,7 @@ def __init__(self, directives=None): self._out_append = list() if not directives: - directives = {} + directives = [] for d in directives: diff --git a/src/radical/saga/utils/pty_shell.py b/src/radical/saga/utils/pty_shell.py index d9a910040..247fafc79 100644 --- a/src/radical/saga/utils/pty_shell.py +++ b/src/radical/saga/utils/pty_shell.py @@ -938,8 +938,8 @@ def run_copy_to (self, src, tgt, cp_flags=None) : info = self.pty_info repl = dict (list({'src' : src, - 'tgt' : tgt, - 'cp_flags' : cp_flags + 'tgt' : tgt, + 'cp_flags' : cp_flags }.items ()) + list(info.items ())) # at this point, we do have a valid, living master From 3ff52fa17191d8aaf38b37308943f1c05fa30926 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 23 Jan 2020 10:34:17 +0100 Subject: [PATCH 07/11] slurm cleanup --- .../saga/adaptors/shell/shell_wrapper.sh | 17 +++++++++-------- src/radical/saga/adaptors/slurm/slurm_job.py | 4 ++++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/radical/saga/adaptors/shell/shell_wrapper.sh b/src/radical/saga/adaptors/shell/shell_wrapper.sh index fe9d9d449..d409c8d1a 100644 --- a/src/radical/saga/adaptors/shell/shell_wrapper.sh +++ b/src/radical/saga/adaptors/shell/shell_wrapper.sh @@ -1,6 +1,6 @@ #!/bin/sh -# be friendly to bash users (and yes, the leading space is on purpose) +# be friendly to bash users HISTIGNORE='*' export HISTIGNORE @@ -8,8 +8,8 @@ export HISTIGNORE # other shell extensions. It expects /bin/sh to be a POSIX compliant shell # thought. # -# The invokation passes one (optional) parameter, the base workdir. That -# directory will be used to keep job state data. It' default value is set to +# The invocation passes one (optional) parameter, the base workdir. That +# directory will be used to keep job state data. Its default value is set to # $HOME/.radical/saga/adaptors/shell_job/ @@ -25,12 +25,13 @@ export HISTIGNORE # } # # on tracing: -# http://www.unix.com/shell-programming-and-scripting/165648-set-x-within-script-capture-file.html +# http://www.unix.com/shell-programming-and-scripting/ \ +# 165648-set-x-within-script-capture-file.html # -------------------------------------------------------------------- # -# Fucking /bin/kill by Ubuntu sometimes understands --, sometimes does not :-P +# Fucking /bin/kill on Ubuntu sometimes understands --, sometimes does not :-P # We need to check the version, and assume that prior to 3.3.0 it is not # understood KILL_DASHES="--" @@ -47,7 +48,7 @@ fi # -------------------------------------------------------------------- # # POSIX echo does not understand '\n'. For multiline strings we thus use printf -# -- but printf will interprete every single '%' in the string, which we don't +# -- but printf will interpret every single '%' in the string, which we don't # want. We thus escape it to '%%' qprintf(){ \printf "%b\n" "$*" @@ -203,7 +204,7 @@ decode () { # -------------------------------------------------------------------- # -# it is suprisingly difficult to get seconds since epoch in POSIX -- +# it is surprisingly difficult to get seconds since epoch in POSIX -- # 'date +%%s' is a GNU extension... Anyway, awk to the rescue! # timestamp () { @@ -863,7 +864,7 @@ cmd_quit () { \stty echonl >/dev/null 2>&1 \printf "cmd_quit called ($EXIT_VAL)" - + # avoid running circles \trap - EXIT diff --git a/src/radical/saga/adaptors/slurm/slurm_job.py b/src/radical/saga/adaptors/slurm/slurm_job.py index 90e6fc2cc..8d7d52932 100644 --- a/src/radical/saga/adaptors/slurm/slurm_job.py +++ b/src/radical/saga/adaptors/slurm/slurm_job.py @@ -989,7 +989,9 @@ def _job_get_info(self): # if the 'gone' flag is set, there's no need to query the job # state again. it's gone forever + self._logger.debug("=== prev: %s", prev_info) if prev_info: + self._logger.debug("=== gone: %s", prev_info.get('gone')) if prev_info.get('gone', False): self._logger.debug("Job is gone.") return prev_info @@ -1135,6 +1137,8 @@ def _job_get_info(self): curr_info['gone'] = True + self.js.jobs[self._id] = curr_info + return curr_info From 6123861ea2c6b8767b5c9498e726bfa12289e77d Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 23 Jan 2020 12:44:16 +0100 Subject: [PATCH 08/11] cleanup --- src/radical/saga/adaptors/slurm/slurm_job.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/radical/saga/adaptors/slurm/slurm_job.py b/src/radical/saga/adaptors/slurm/slurm_job.py index 8d7d52932..20358137a 100644 --- a/src/radical/saga/adaptors/slurm/slurm_job.py +++ b/src/radical/saga/adaptors/slurm/slurm_job.py @@ -989,9 +989,7 @@ def _job_get_info(self): # if the 'gone' flag is set, there's no need to query the job # state again. it's gone forever - self._logger.debug("=== prev: %s", prev_info) if prev_info: - self._logger.debug("=== gone: %s", prev_info.get('gone')) if prev_info.get('gone', False): self._logger.debug("Job is gone.") return prev_info @@ -1080,7 +1078,7 @@ def _job_get_info(self): key, val = parts if val in ['', '(null)']: val = None - self._logger.info('=== %s := %s', key, val) + self._logger.info('%-20s := %s', key, val) data[key] = val if data.get('JobState'): From adacef8f19f50a7984987839e7191eef5c549fd7 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 23 Jan 2020 12:45:42 +0100 Subject: [PATCH 09/11] version bump --- CHANGES.md | 6 ++++++ VERSION | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index b67d3b8c5..ef66ff53d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,12 @@ https://github.com/radical-cybertools/radical.saga/issues?q=is%3Aissue+is%3Aopen+ +Version 1.0.1 Release 2020-01-23 +-------------------------------------------------------------------------------- + + - small fixes and cleanup for slurm and docs (tutorial prep) + + Version 1.0.0 Release 2019-12-24 -------------------------------------------------------------------------------- diff --git a/VERSION b/VERSION index 3eefcb9dd..7dea76edb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.0 +1.0.1 From 4b2d203ff78f4ada2d13b871ec176b79deae3fc9 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Mon, 27 Jan 2020 11:19:40 +0100 Subject: [PATCH 10/11] fixes per PR comments --- docs/source/usage/logger.rst | 48 ++++++++++++---------- examples/jobs/slurmjobcontainer.py | 9 ++-- src/radical/saga/adaptors/noop/noop_job.py | 14 +++---- 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/docs/source/usage/logger.rst b/docs/source/usage/logger.rst index 71358a78f..81ea45975 100644 --- a/docs/source/usage/logger.rst +++ b/docs/source/usage/logger.rst @@ -25,7 +25,7 @@ executing shell and evaluated by SAGA at program startup. .. envvar:: RADICAL_SAGA_LOG_LVL Controls the log level. This controls the amount of output generated by the - logging system. ``SAGA_VERBOSE`` expects either a numeric (0-4) value or a + logging system. ``RADICAL_LOG_LVL`` expects either a numeric (0-4) value or a string (case insensitive) representing the log level: +---------------+---------------------+------------------------------------+ @@ -53,40 +53,44 @@ executing shell and evaluated by SAGA at program startup. +---------------+---------------------+------------------------------------+ For example, if you want to see the debug messages that SAGA generates during - program execution, you would set :envvar:`SAGA_VERBOSE` to ``DEBUG`` before - you run your program:: + program execution, you would set :envvar:`RADICAL_LOG_LVL` to ``DEBUG`` + before you run your program:: RADICAL_SAGA_LOG_LVL=DEBUG python mysagaprog.py -# .. envvar:: RADICAL_SAGA_LOG_LVL -# -# Controls the message sources displayed. SAGA uses a -# hierarchal structure for its log sources. Starting with the root logger -# ``saga``, several sub loggers are defined for SAGA-internal logging events -# (``saga.engine``) and individual middleware adaptors ``saga.adaptor.name``. -# ``SAGA_LOG_FILTERS`` expects either a single source name or a -# comma-separated list of source names. Non-existing source names are ignored. -# -# For example, if you want to see only the debug messages generated by -# ``saga.engine`` and a specific middleware adaptor called ``xyz`` you would -# set the following environment variables:: -# -# SAGA_VERBOSE=DEBUG SAGA_LOG_FILTERS=saga.engine,saga.adaptor.xyz python mysagaprog.py + .. envvar:: RADICAL_LOG_LVL + + Controls the message sources displayed. RCT use an hierarchal structure for + its log sources. Starting with the root logger ``RADICAL``, sub loggers are + defined for internal logging events (``RADICAL_SAGA``, + ``RADICAL_SAGA_ENGINE`` etc.) and individual middleware adaptors, e.g., + ``RADICAL_SAGA_ADAPTORS_NAME``. ``LOG_LVL`` and ``LOG_TGT`` can be set + individually for those loggers. + + For example, if you want to see only the debug messages generated by + ``saga.engine`` and a specific middleware adaptor called ``xyz`` you would + set the following environment variables:: + + RADICAL_LOG_LVL=ERROR \ # mute everything + RADICAL_SAGA_ENGINNE_LOG_LVL=DEBUG \ # enable engine logger + RADICAL_SAGA_ADAPTORS_XYZ_LOG_LVL=DEBUG \ # enable XYZ logger + python mysagaprog.py .. envvar:: RADICAL_SAGA_LOG_TGT Controls where the log messages go. Multiple concurrent locations are - supported. - ``SAGA_LOG_TARGETS`` expects either a single location or a comma-separated - list of locations, where a location can either be a path/filename or - the ``stdout``/``stderr`` keyword (case sensitive) for logging to the console. + supported. ``RADICAL_LOG_TGT`` expects either a single location or + a comma-separated list of locations, where a location can either be + a path/filename or the ``stdout``/``stderr`` keyword (case sensitive) for + logging to the console. For example, if you want to see debug messages on the console but also want to log them in a file for further analysis, you would set the the following environment variables:: - SAGA_VERBOSE=DEBUG SAGA_LOG_TARGETS=stdout,./rs.log python mysagaprog.py + RADICAL_SAGA_LOG_LVL=DEBUG RADICAL_SAGA_LOG_TGT=stdout,./rs.log \ + python mysagaprog.py diff --git a/examples/jobs/slurmjobcontainer.py b/examples/jobs/slurmjobcontainer.py index beac4c39e..83b57bf65 100755 --- a/examples/jobs/slurmjobcontainer.py +++ b/examples/jobs/slurmjobcontainer.py @@ -9,17 +9,16 @@ This examples shows how to run groups of jobs using the 'local' file adaptor. This example uses job containers for simplified and optimized bulk job handling. -Job container can be used to easily model dependencies between groups of -different jobs, e.g., in workflow scenarios. In this example, we execute -'num_job_groups' containers of jobs_per_group' number of parallel jobs -sequentially:: +Job container can be used to model dependencies between groups of different +jobs, e.g., in workflow scenarios. In this example, we execute 'num_job_groups' +containers of jobs_per_group' number of parallel jobs sequentially:: C1[j1,j2,j3,j4,...] -> C2[j1,j2,j3,j4,...] -> C3[j1,j2,j3,j4,...] -> ... Depending on the adaptor implementation, using job containers can be quite advantageous in terms of call latency. Some adaptors implement special bulk operations for container management, which makes them generally much faster than -iterating over and operating on individual jobs. +iterating over and operating on individual jobs. ''' import sys diff --git a/src/radical/saga/adaptors/noop/noop_job.py b/src/radical/saga/adaptors/noop/noop_job.py index 6383ae167..a4b6401c3 100644 --- a/src/radical/saga/adaptors/noop/noop_job.py +++ b/src/radical/saga/adaptors/noop/noop_job.py @@ -4,9 +4,8 @@ __license__ = "MIT" -''' noop based job adaptor implementation ''' +''' no operation (noop) based job adaptor implementation ''' -import re import time import threading @@ -185,7 +184,7 @@ def run(self): # The adaptor class class Adaptor(base.Base): ''' - This is the actual adaptor class, which gets loaded by SAGA (i.e. by the + This is the actual adaptor class, which gets loaded by SAGA (i.e., by the SAGA engine), and which registers the CPI implementation classes which provide the adaptor's functionality. ''' @@ -347,8 +346,7 @@ def get_job(self, job_id, no_reconnect=False): def container_run(self, jobs): ''' From all the job descriptions in the container, build a bulk, and submit - as async. The read whaterver the wrapper returns, and sort through the - messages, assigning job IDs etc. + as async. ''' self._logger.debug("container run: %s" % str(jobs)) @@ -562,9 +560,9 @@ def wait(self, timeout): other interactions. In particular, it would practically kill it if the Wait waits forever... - So we implement the wait via a state pull. The *real* solution is, of - course, to implement state notifications, and wait for such - a notification to arrive within timeout seconds... + So we implement the wait via a state pull. The *real* solution is to + implement state notifications, and wait for such a notification to + arrive within timeout seconds... ''' time_start = time.time() From 3c934ab8ed5bb76a26d3b605c7c65e0344c38eae Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Mon, 27 Jan 2020 14:48:30 +0100 Subject: [PATCH 11/11] test fixes --- tests/unittests/api/filesystem/test_file.py | 4 ++-- tests/unittests/api/job/test_job.py | 5 +++-- tests/unittests/utils/test_pty_shell.py | 8 ++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/unittests/api/filesystem/test_file.py b/tests/unittests/api/filesystem/test_file.py index 0517dc7ba..a8aa652f4 100644 --- a/tests/unittests/api/filesystem/test_file.py +++ b/tests/unittests/api/filesystem/test_file.py @@ -55,10 +55,10 @@ def test_nonexisting_host_file_open(self): try: tc = config() invalid_url = deepcopy(rs.Url(tc.filesystem_url)) - invalid_url.host = "does.not.exist" + invalid_url.host = "does/not/exist" _ = rs.filesystem.File(invalid_url) assert False, "Expected BadParameter exception but got none." - except rs.BadParameter: + except rs.DoesNotExist: assert True except rs.SagaException as ex: assert False, "Expected BadParameter exception, but got %s" % ex diff --git a/tests/unittests/api/job/test_job.py b/tests/unittests/api/job/test_job.py index 7b28c1d2c..302851070 100755 --- a/tests/unittests/api/job/test_job.py +++ b/tests/unittests/api/job/test_job.py @@ -98,8 +98,9 @@ def test_job_service_invalid_url(self): tmp_js = None try: - invalid_url = rs.Url(self.cfg['job_service_url']) - invalid_url.host = "does.not.exist" + invalid_url = rs.Url(self.cfg['job_service_url']) + invalid_url.schema = "ssh" + invalid_url.host = "does.not.exist" tmp_js = rs.job.Service(invalid_url, self.session) assert False, "Expected BadParameter exception but got none." diff --git a/tests/unittests/utils/test_pty_shell.py b/tests/unittests/utils/test_pty_shell.py index 0941633ef..a507cfdbb 100755 --- a/tests/unittests/utils/test_pty_shell.py +++ b/tests/unittests/utils/test_pty_shell.py @@ -126,10 +126,10 @@ def test_ptyshell_file_stage () : if __name__ == '__main__': test_ptyshell_ok() - test_ptyshell_nok() - test_ptyshell_async() - test_ptyshell_prompt() - test_ptyshell_file_stage() + # test_ptyshell_nok() + # test_ptyshell_async() + # test_ptyshell_prompt() + # test_ptyshell_file_stage() # ------------------------------------------------------------------------------