Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored docstrings to obey to PEP. Patch for #56 #57

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions sciluigi/audit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains functionality for the audit-trail logging functionality
'''
"""

import logging
import luigi
@@ -15,20 +15,22 @@

# ==============================================================================


class AuditTrailHelpers(object):
'''
"""
Mixin for luigi.Task:s, with functionality for writing audit logs of running tasks
'''
"""
def add_auditinfo(self, infotype, infoval):
'''
"""
Alias to _add_auditinfo(), that can be overridden.
'''
"""
return self._add_auditinfo(self.instance_name, infotype, infoval)


def _add_auditinfo(self, instance_name, infotype, infoval):
'''
"""
Save audit information in a designated file, specific for this task.
'''
"""
dirpath = self.workflow_task.get_auditdirpath()
if not os.path.isdir(dirpath):
time.sleep(random.random())
@@ -42,32 +44,35 @@ def _add_auditinfo(self, instance_name, infotype, infoval):
with open(auditfile, 'a') as afile:
afile.write('%s: %s\n' % (infotype, infoval))


def get_instance_name(self):
'''
"""
Return the luigi instance_name
'''
"""
instance_name = None
if self.instance_name is not None:
instance_name = self.instance_name
else:
instance_name = self.task_id
return instance_name


@luigi.Task.event_handler(luigi.Event.START)
def save_start_time(self):
'''
"""
Log start of execution of task.
'''
"""
if hasattr(self, 'workflow_task') and self.workflow_task is not None:
msg = 'Task {task} started'.format(
task=self.get_instance_name())
log.info(msg)


@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def save_end_time(self, task_exectime_sec):
'''
"""
Log end of execution of task, with execution time.
'''
"""
if hasattr(self, 'workflow_task') and self.workflow_task is not None:
msg = 'Task {task} finished after {proctime:.3f}s'.format(
task=self.get_instance_name(),
46 changes: 26 additions & 20 deletions sciluigi/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'''
"""
This module contains functionality for dependency resolution for constructing
the dependency graph of workflows.
'''
"""

import luigi
from luigi.contrib.postgres import PostgresTarget
@@ -10,11 +10,12 @@

# ==============================================================================


class TargetInfo(object):
'''
"""
Class to be used for sending specification of which target, from which
task, to use, when stitching workflow tasks' outputs and inputs together.
'''
"""
task = None
path = None
target = None
@@ -25,21 +26,24 @@ def __init__(self, task, path, format=None, is_tmp=False):
self.target = luigi.LocalTarget(path, format, is_tmp)

def open(self, *args, **kwargs):
'''
"""
Forward open method, from luigi's target class
'''
"""
return self.target.open(*args, **kwargs)

# ==============================================================================


class S3TargetInfo(TargetInfo):

def __init__(self, task, path, format=None, client=None):
self.task = task
self.path = path
self.target = S3Target(path, format=format, client=client)

# ==============================================================================


class PostgresTargetInfo(TargetInfo):
def __init__(self, task, host, database, user, password, update_id, table=None, port=None):
self.task = task
@@ -54,28 +58,30 @@ def __init__(self, task, host, database, user, password, update_id, table=None,

# ==============================================================================


class DependencyHelpers(object):
'''

"""
Mixin implementing methods for supporting dynamic, and target-based
workflow definition, as opposed to the task-based one in vanilla luigi.
'''
"""

# --------------------------------------------------------
# Handle inputs
# --------------------------------------------------------

def requires(self):
'''
"""
Implement luigi API method by returning upstream tasks
'''
"""
return self._upstream_tasks()

def _upstream_tasks(self):
'''
"""
Extract upstream tasks from the TargetInfo objects
or functions returning those (or lists of both the earlier)
for use in luigi's requires() method.
'''
"""
upstream_tasks = []
for attrname, attrval in iteritems(self.__dict__):
if 'in_' == attrname[0:3]:
@@ -84,11 +90,11 @@ def _upstream_tasks(self):
return upstream_tasks

def _parse_inputitem(self, val, tasks):
'''
"""
Recursively loop through lists of TargetInfos, or
callables returning TargetInfos, or lists of ...
(repeat recursively) ... and return all tasks.
'''
"""
if callable(val):
val = val()
if isinstance(val, TargetInfo):
@@ -108,17 +114,17 @@ def _parse_inputitem(self, val, tasks):
# --------------------------------------------------------

def output(self):
'''
"""
Implement luigi API method
'''
"""
return self._output_targets()

def _output_targets(self):
'''
"""
Extract output targets from the TargetInfo objects
or functions returning those (or lists of both the earlier)
for use in luigi's output() method.
'''
"""
output_targets = []
for attrname in dir(self):
attrval = getattr(self, attrname)
@@ -128,11 +134,11 @@ def _output_targets(self):
return output_targets

def _parse_outputitem(self, val, targets):
'''
"""
Recursively loop through lists of TargetInfos, or
callables returning TargetInfos, or lists of ...
(repeat recursively) ... and return all targets.
'''
"""
if callable(val):
val = val()
if isinstance(val, TargetInfo):
20 changes: 12 additions & 8 deletions sciluigi/interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains mappings of methods that are part of the sciluigi API
'''
"""

import luigi
import logging
@@ -11,10 +11,11 @@
LOGFMT_SCILUIGI = '%(asctime)s %(levelname)8s SCILUIGI %(message)s'
DATEFMT = '%Y-%m-%d %H:%M:%S'


def setup_logging():
'''
"""
Set up SciLuigi specific logging
'''
"""
sciluigi.util.ensuredir('log')
log_path = 'log/sciluigi_run_%s_detailed.log' % sciluigi.util.timepath()

@@ -49,16 +50,19 @@ def setup_logging():
sciluigi_logger.addHandler(sciluigi_file_handler)
sciluigi_logger.setLevel(logging.DEBUG)


setup_logging()


def run(*args, **kwargs):
'''
"""
Forwarding luigi's run method
'''
"""
luigi.run(*args, **kwargs)


def run_local(*args, **kwargs):
'''
"""
Forwarding luigi's run method, with local scheduler
'''
"""
run(local_scheduler=True, *args, **kwargs)
9 changes: 5 additions & 4 deletions sciluigi/parameter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'''
"""
This module contains a sciluigi subclass of luigi's Parameter, where
custom functionality might be added in the future.
'''
"""

import luigi


class Parameter(luigi.Parameter):
'''
"""
Subclass of luigi's Parameter, where custom functionality might be added in the future.
'''
"""
pass
108 changes: 65 additions & 43 deletions sciluigi/slurm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'''
"""
This module contains functionality related to integration with the SLURM HPC
resource manger.
'''
"""

import datetime
import logging
@@ -10,6 +10,7 @@
import sciluigi.parameter
import sciluigi.task
import subprocess as sub
from multiprocessing import cpu_count

# ================================================================================

@@ -23,10 +24,11 @@

# ================================================================================

class SlurmInfo():
'''

class SlurmInfo:
"""
A data object for keeping slurm run parameters.
'''
"""
runmode = None # One of RUNMODE_LOCAL|RUNMODE_HPC|RUNMODE_MPI
project = None
partition = None
@@ -36,10 +38,10 @@ class SlurmInfo():
threads = None

def __init__(self, runmode, project, partition, cores, time, jobname, threads):
'''
"""
Init a SlurmInfo object, from string data.
Time is on format: [[[d-]HH:]MM:]SS
'''
"""
self.runmode = runmode
self.project = project
self.partition = partition
@@ -49,9 +51,9 @@ def __init__(self, runmode, project, partition, cores, time, jobname, threads):
self.threads = threads

def __str__(self):
'''
"""
Return a readable string representation of the info stored
'''
"""
strrepr = ('(time: {t}, '
'partition: {pt}, '
'cores: {c}, '
@@ -67,12 +69,16 @@ def __str__(self):
return strrepr

def get_argstr_hpc(self):
'''
"""
Return a formatted string with arguments and option flags to SLURM
commands such as salloc and sbatch, for non-MPI, HPC jobs.
'''
argstr = ' -A {pr} -p {pt} -n {c} -t {t} -J {j} srun -n 1 -c {thr} '.format(
pr=self.project,
"""

argstr = " "
if self.project is not None:
argstr += "-A {pr} ".format(pr=self.project)

argstr += '-p {pt} -n {c} -t {t} -J {j} srun -n 1 -c {thr} '.format(
pt=self.partition,
c=self.cores,
t=self.time,
@@ -81,12 +87,16 @@ def get_argstr_hpc(self):
return argstr

def get_argstr_mpi(self):
'''
"""
Return a formatted string with arguments and option flags to SLURM
commands such as salloc and sbatch, for MPI jobs.
'''
argstr = ' -A {pr} -p {pt} -n {c1} -t {t} -J {j} mpirun -v -np {c2} '.format(
pr=self.project,
"""

argstr = " "
if self.project is not None:
argstr += "-A {pr} ".format(pr=self.project)

argstr += '-p {pt} -n {c1} -t {t} -J {j} mpirun -v -np {c2} '.format(
pt=self.partition,
c1=self.cores,
t=self.time,
@@ -96,10 +106,16 @@ def get_argstr_mpi(self):

# ================================================================================


default_info = SlurmInfo(RUNMODE_LOCAL,
"sciluigi", None, cpu_count(),
"1-00:00:00", "sciluigi", cpu_count())


class SlurmInfoParameter(sciluigi.parameter.Parameter):
'''
"""
A specialized luigi parameter, taking SlurmInfo objects.
'''
"""
def parse(self, x):
if isinstance(x, SlurmInfo):
return x
@@ -108,18 +124,19 @@ def parse(self, x):

# ================================================================================


class SlurmHelpers():
'''
"""
Mixin with various convenience methods for executing jobs via SLURM
'''
"""
# Other class-fields
slurminfo = SlurmInfoParameter(default=None) # Class: SlurmInfo
slurminfo = SlurmInfoParameter(default=default_info) # Class: SlurmInfo

# Main Execution methods
def ex(self, command):
'''
"""
Execute either locally or via SLURM, depending on config
'''
"""
if isinstance(command, list):
command = ' '.join(command)

@@ -133,25 +150,23 @@ def ex(self, command):
log.info('Executing command in MPI mode: %s', command)
self.ex_mpi(command)


def ex_hpc(self, command):
'''
"""
Execute command in HPC mode
'''
"""
if isinstance(command, list):
command = sub.list2cmdline(command)

fullcommand = 'salloc %s %s' % (self.slurminfo.get_argstr_hpc(), command)
fullcommand = 'salloc {} {}'.format(self.slurminfo.get_argstr_hpc(), command)
(retcode, stdout, stderr) = self.ex_local(fullcommand)

self.log_slurm_info(stderr)
return (retcode, stdout, stderr)


def ex_mpi(self, command):
'''
"""
Execute command in HPC mode with MPI support (multi-node, message passing interface).
'''
"""
if isinstance(command, list):
command = sub.list2cmdline(command)

@@ -161,35 +176,39 @@ def ex_mpi(self, command):
self.log_slurm_info(stderr)
return (retcode, stdout, stderr)


# Various convenience methods

def assert_matches_character_class(self, char_class, a_string):
'''
@staticmethod
def assert_matches_character_class(char_class, a_string):
"""
Helper method, that tests whether a string matches a regex character class.
'''
"""
if not bool(re.match('^{c}+$'.format(c=char_class), a_string)):
raise Exception('String {s} does not match character class {cc}'.format(
s=a_string, cc=char_class))

def clean_filename(self, filename):
'''
@staticmethod
def clean_filename(filename):
"""
Clean up a string to make it suitable for use as file name.
'''
"""
return re.sub('[^A-Za-z0-9\_\ ]', '_', str(filename)).replace(' ', '_')

#def get_task_config(self, name):
# return luigi.configuration.get_config().get(self.task_family, name)

def log_slurm_info(self, slurm_stderr):
'''
"""
Parse information of the following example form:
salloc: Granted job allocation 5836263
srun: Job step created
salloc: Relinquishing job allocation 5836263
salloc: Job allocation 5836263 has been revoked.
'''
"""

if isinstance(slurm_stderr, bytes):
slurm_stderr = slurm_stderr.decode()

matches = re.search('[0-9]+', str(slurm_stderr))
if matches:
@@ -198,10 +217,12 @@ def log_slurm_info(self, slurm_stderr):
# Write slurm execution time to audit log
cmd = 'sacct -j {jobid} --noheader --format=elapsed'.format(jobid=jobid)
(_, jobinfo_stdout, _) = self.ex_local(cmd)
if isinstance(jobinfo_stdout, bytes):
jobinfo_stdout = jobinfo_stdout.decode()
sacct_matches = re.findall('([0-9\:\-]+)', str(jobinfo_stdout))

if len(sacct_matches) < 2:
log.warn('Not enough matches from sacct for task %s: %s',
log.warning('Not enough matches from sacct for task %s: %s',
self.instance_name, ', '.join(['Match: %s' % m for m in sacct_matches])
)
else:
@@ -237,8 +258,9 @@ def log_slurm_info(self, slurm_stderr):

# ================================================================================


class SlurmTask(SlurmHelpers, sciluigi.task.Task):
'''
"""
luigi task that includes the SlurmHelpers mixin.
'''
"""
pass
28 changes: 16 additions & 12 deletions sciluigi/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains sciluigi's subclasses of luigi's Task class.
'''
"""
import json
import luigi
from luigi.six import iteritems, string_types
@@ -15,11 +15,12 @@

# ==============================================================================


def new_task(name, cls, workflow_task, **kwargs):
'''
"""
Instantiate a new task. Not supposed to be used by the end-user
(use WorkflowTask.new_task() instead).
'''
"""
slurminfo = None
for key, val in [(key, val) for key, val in iteritems(kwargs)]:
# Handle non-string keys
@@ -42,18 +43,19 @@ def new_task(name, cls, workflow_task, **kwargs):
newtask.slurminfo = slurminfo
return newtask


class Task(sciluigi.audit.AuditTrailHelpers, sciluigi.dependencies.DependencyHelpers, luigi.Task):
'''
"""
SciLuigi Task, implementing SciLuigi specific functionality for dependency resolution
and audit trail logging.
'''
"""
workflow_task = luigi.Parameter()
instance_name = luigi.Parameter()

def ex_local(self, command):
'''
"""
Execute command locally (not through resource manager).
'''
"""
# If list, convert to string
if isinstance(command, list):
command = sub.list2cmdline(command)
@@ -80,21 +82,23 @@ def ex_local(self, command):
return (retcode, stdout, stderr)

def ex(self, command):
'''
"""
Execute command. This is a short-hand function, to be overridden e.g. if supporting
execution via SLURM
'''
"""
return self.ex_local(command)


# ==============================================================================


class ExternalTask(
sciluigi.audit.AuditTrailHelpers,
sciluigi.dependencies.DependencyHelpers,
luigi.ExternalTask):
'''
"""
SviLuigi specific implementation of luigi.ExternalTask, representing existing
files.
'''
"""
workflow_task = luigi.Parameter()
instance_name = luigi.Parameter()
35 changes: 21 additions & 14 deletions sciluigi/util.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,61 @@
'''
"""
This module contains utility methods that are used in various places across the
sciluigi library
'''
"""

import csv
import os
import time
from luigi.six import iteritems


def timestamp(datefmt='%Y-%m-%d, %H:%M:%S'):
'''
"""
Create timestamp as a formatted string.
'''
"""
return time.strftime(datefmt, time.localtime())


def timepath(sep='_'):
'''
"""
Create timestmap, formatted for use in file names.
'''
"""
return timestamp('%Y%m%d{sep}%H%M%S'.format(sep=sep))


def timelog():
'''
"""
Create time stamp for use in log files.
'''
"""
return timestamp('[%Y-%m-%d %H:%M:%S]')


def ensuredir(dirpath):
'''
"""
Ensure directory exists.
'''
"""
if not os.path.exists(dirpath):
os.makedirs(dirpath)


RECORDFILE_DELIMITER = ':'


def recordfile_to_dict(filehandle):
'''
"""
Convert a record file to a dictionary.
'''
"""
csvrd = csv.reader(filehandle, delimiter=RECORDFILE_DELIMITER, skipinitialspace=True)
records = {}
for row in csvrd:
records[row[0]] = row[1]
return records


def dict_to_recordfile(filehandle, records):
'''
"""
Convert a dictionary to a recordfile.
'''
"""
csvwt = csv.writer(filehandle, delimiter=RECORDFILE_DELIMITER, skipinitialspace=True)
rows = []
for key, val in iteritems(records):
69 changes: 37 additions & 32 deletions sciluigi/workflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'''
"""
This module contains sciluigi's subclasses of luigi's Task class.
'''
"""

import datetime
import luigi
@@ -12,15 +12,28 @@
import sciluigi.dependencies
import sciluigi.slurm


log = logging.getLogger('sciluigi-interface')


# ==============================================================================


class WorkflowNotImplementedException(Exception):
"""
Exception to throw if the workflow() SciLuigi API method is not implemented.
"""
pass


# ================================================================================


class WorkflowTask(sciluigi.audit.AuditTrailHelpers, luigi.Task):
'''
"""
SciLuigi-specific task, that has a method for implementing a (dynamic) workflow
definition (workflow()).
'''
"""

instance_name = luigi.Parameter(default='sciluigi_workflow')

@@ -32,16 +45,16 @@ class WorkflowTask(sciluigi.audit.AuditTrailHelpers, luigi.Task):
_hasaddedhandler = False

def _ensure_timestamp(self):
'''
"""
Make sure that there is a time stamp for when the workflow started.
'''
"""
if self._wfstart == '':
self._wfstart = datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S_%f')

def get_wflogpath(self):
'''
"""
Get the path to the workflow-speicfic log file.
'''
"""
if self._wflogpath == '':
self._ensure_timestamp()
clsname = self.__class__.__name__.lower()
@@ -50,41 +63,41 @@ def get_wflogpath(self):
return self._wflogpath

def get_auditdirpath(self):
'''
"""
Get the path to the workflow-speicfic audit trail directory.
'''
"""
self._ensure_timestamp()
clsname = self.__class__.__name__.lower()
audit_dirpath = 'audit/.audit_%s_%s' % (clsname, self._wfstart)
return audit_dirpath

def get_auditlogpath(self):
'''
"""
Get the path to the workflow-speicfic audit trail file.
'''
"""
self._ensure_timestamp()
clsname = self.__class__.__name__.lower()
audit_dirpath = 'audit/workflow_%s_started_%s.audit' % (clsname, self._wfstart)
return audit_dirpath

def add_auditinfo(self, infotype, infolog):
'''
"""
Add audit information to the audit log.
'''
"""
return self._add_auditinfo(self.__class__.__name__.lower(), infotype, infolog)

def workflow(self):
'''
"""
SciLuigi API methoed. Implement your workflow here, and return the last task(s)
of the dependency graph.
'''
"""
raise WorkflowNotImplementedException(
'workflow() method is not implemented, for ' + str(self))

def requires(self):
'''
"""
Implementation of Luigi API method.
'''
"""
if not self._hasaddedhandler:
wflog_formatter = logging.Formatter(
sciluigi.interface.LOGFMT_STREAM,
@@ -110,16 +123,16 @@ def requires(self):
return workflow_output

def output(self):
'''
"""
Implementation of Luigi API method
'''
"""
return {'log': luigi.LocalTarget(self.get_wflogpath()),
'audit': luigi.LocalTarget(self.get_auditlogpath())}

def run(self):
'''
"""
Implementation of Luigi API method
'''
"""
if self.output()['audit'].exists():
errmsg = ('Audit file already exists, '
'when trying to create it: %s') % self.output()['audit'].path
@@ -139,17 +152,9 @@ def run(self):
self._hasloggedfinish = True

def new_task(self, instance_name, cls, **kwargs):
'''
"""
Create new task instance, and link it to the current workflow.
'''
"""
newtask = sciluigi.new_task(instance_name, cls, self, **kwargs)
self._tasks[instance_name] = newtask
return newtask

# ================================================================================

class WorkflowNotImplementedException(Exception):
'''
Exception to throw if the workflow() SciLuigi API method is not implemented.
'''
pass
57 changes: 32 additions & 25 deletions tools/init_projdir.py
Original file line number Diff line number Diff line change
@@ -2,49 +2,54 @@
import os
import shutil
import luigi
import six


projdir_struct = {
'bin':None,
'conf':None,
'doc' :
{ 'paper': None },
'experiments' :
{ '2000-01-01-example' :
{ 'audit':None,
'bin':None,
'conf':None,
'data':None,
'doc':None,
'lib':None,
'log':None,
'raw':None,
'results':None,
'run':None,
'tmp':None }},
'lib':None,
'raw':None,
'results':None,
'src':None }
'bin': None,
'conf': None,
'doc': {'paper': None},
'experiments':
{'2000-01-01-example':
{'audit': None,
'bin': None,
'conf': None,
'data': None,
'doc': None,
'lib': None,
'log': None,
'raw': None,
'results': None,
'run': None,
'tmp': None}},
'lib': None,
'raw': None,
'results': None,
'src': None}


def get_file_dir():
return os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))


def print_dirs(dir_structure, padding, padstep):
if type(dir_structure) is dict:
for k,v in dir_structure.iteritems():
print str(' ' * padding) + k
print_dirs(v, padding+padstep, padstep)
for k,v in six.iteritems(dir_structure):
six.print_(str(' ' * padding) + k)
six.print_(print_dirs(v, padding+padstep, padstep))


def create_dirs(dirtree):
if type(dirtree) is dict:
for dir,subtree in dirtree.iteritems():
for dir,subtree in six.iteritems(dirtree):
print('Creating ' + dir + ' ...')
os.makedirs(dir)
if subtree is not None:
os.chdir(dir)
create_dirs(subtree)
os.chdir('..')


def print_and_create_projdirs():
print('Now creating the following directory structure:')
print('-'*80)
@@ -55,6 +60,7 @@ def print_and_create_projdirs():


class InitProj(luigi.Task):

projname = luigi.Parameter()

def output(self):
@@ -63,6 +69,7 @@ def output(self):
def run(self):
shutil.copytree(get_file_dir() + '/../.projtpl', self.projname)


if __name__ == '__main__':
luigi.run()
#print get_file_dir()