Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job scheduling adjustments #83

Merged
merged 6 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions deploy/templates/container_env_vars.j2
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,8 @@ LOCAL_PTYPE_ID={{ local_ptype_id }}
## external datasets
PX_PROJECT_ID={{ px_proj_id }}
EXTERNAL_PRODUCER_IDS={{ external_producer_ids }}
RAW_SF_GROUP_ID={{ raw_sfgroup_id }}

# long QC IDs in DB, user_id corefac user, also for transfer etc
QC_USER_ID={{ qc_user_id }}
QC_DATATYPE={{ qc_dtype_id }}
QC_ORGANISM={{ qc_organism_id }}
INSTRUMENT_QC_PROJECT={{ qc_proj_id }}
INSTRUMENT_QC_EXP={{ qc_exp_id }}
INSTRUMENT_QC_RUNNAME={{ qc_run_id }}
LONGQC_DBID={{ qc_dbfa_id }}

## nextflow DB IDs
Expand Down
53 changes: 27 additions & 26 deletions src/backend/analysis/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from analysis import tasks, models
from rawstatus import models as rm
from rawstatus import tasks as filetasks
from datasets import models as dsmodels
from datasets.jobs import get_or_create_mzmlentry
from jobs.jobs import DatasetJob, MultiDatasetJob, SingleFileJob, BaseJob
from jobs.jobs import DatasetJob, SingleFileJob, BaseJob, MultiFileJob

# FIXME
# Need to work with analysis components!
Expand Down Expand Up @@ -52,27 +51,29 @@ def process(self, **kwargs):
existing_refined = mzmlfiles.filter(mzmlfile__refined=True)
mzml_nonrefined = mzmlfiles.exclude(rawfile__storedfile__in=existing_refined).select_related('mzmlfile__pwiz')
dstshare = rm.ServerShare.objects.get(pk=kwargs['dstshare_id'])
timestamp = datetime.strftime(analysis.date, '%Y%m%d_%H.%M')
runpath = f'{analysis.id}_{analysis.name}_{timestamp}'
mzmls = []
for x in mzml_nonrefined:
ref_sf = get_or_create_mzmlentry(x, x.mzmlfile.pwiz, refined=True, servershare_id=dstshare.pk)
# FIXME In task: ln -s {dbid}___{fn}_refined.mzML name as input, leave out that part
# from NF pipeline, process that name in task also, keep out of job and NF
ref_mzml_fname = f'{os.path.splitext(x.filename)[0]}_refined.mzML'
ref_sf = get_or_create_mzmlentry(x, x.mzmlfile.pwiz, refined=True,
servershare_id=dstshare.pk, path=runpath, mzmlfilename=ref_mzml_fname)
mzmls.append({'servershare': x.servershare.name, 'path': x.path, 'fn': x.filename,
'sfid': ref_sf.id})
if ref_sf.purged:
ref_sf.checked = False
ref_sf.purged = False
if not mzmls:
return
mzml_ins = mzmlfiles.distinct('rawfile__producer__msinstrument__instrumenttype__name').get()
params = ['--instrument', mzml_ins.rawfile.producer.msinstrument.instrumenttype.name]
if kwargs['qtype'] != 'labelfree':
params.extend(['--isobaric', kwargs['qtype']])
timestamp = datetime.strftime(analysis.date, '%Y%m%d_%H.%M')
run = {'timestamp': timestamp,
'analysis_id': analysis.id,
'wf_commit': nfwf.commit,
'nxf_wf_fn': nfwf.filename,
'repo': nfwf.nfworkflow.repo,
'runname': f'{analysis.id}_{analysis.name}_{timestamp}',
'runname': runpath,
'outdir': analysis.user.username,
'dstsharename': dstshare.name,
}
Expand Down Expand Up @@ -179,7 +180,7 @@ def recurse_nrdsets_baseanalysis(aba):
return old_mzmls, old_dsets


class RunNextflowWorkflow(BaseJob):
class RunNextflowWorkflow(MultiFileJob):
refname = 'run_nf_search_workflow'
task = tasks.run_nextflow_workflow
revokable = True
Expand All @@ -193,10 +194,9 @@ class RunNextflowWorkflow(BaseJob):
"""

def getfiles_query(self, **kwargs):
return rm.StoredFile.objects.filter(pk__in=kwargs['infiles'].keys()).select_related(
'servershare', 'rawfile__producer__msinstrument__instrumenttype',
'rawfile__datasetrawfile__quantfilechannel__channel__channel',
)
return super().values('servershare__name', 'path' 'filename', 'pk',
'rawfile__producer__msinstrument__instrumenttype__name',
'rawfile__datasetrawfile__quantfilechannel__channel__channel__name')

def process(self, **kwargs):
analysis = models.Analysis.objects.select_related('user', 'nextflowsearch__workflow').get(pk=kwargs['analysis_id'])
Expand Down Expand Up @@ -245,7 +245,7 @@ def process(self, **kwargs):
# FIXME setnames/frac is specific
kwargs['setnames'].pop(str(del_sf.pk))
kwargs['infiles'].pop(str(del_sf.pk))
if obsolete:
if obsolete.exists():
job.kwargs = kwargs
job = job.save()

Expand All @@ -271,25 +271,26 @@ def process(self, **kwargs):
# INPUTDEF is either False or [fn, set, fraction, etc]
if inputdef_fields := run['components']['INPUTDEF']:
for fn in sfiles_passed:
infile = {'servershare': fn.servershare.name, 'path': fn.path, 'fn': fn.filename}
infile = {'servershare': fn['servershare__name'], 'path': fn['path'],
'fn': fn['filename']}
if 'setname' in inputdef_fields:
infile['setname'] = kwargs['filesamples'].get(str(fn.id), '')
infile['setname'] = kwargs['filesamples'].get(str(fn['pk']), '')
if 'plate' in inputdef_fields:
infile['plate'] = kwargs['platenames'].get(str(fn.rawfile.datasetrawfile.dataset_id), '')
if 'sampleID' in inputdef_fields:
# sampleID is for pgt / dbgenerator
# No fallback, is required if in header
infile['sampleID'] = kwargs['filesamples'][str(fn.id)]
infile['sampleID'] = kwargs['filesamples'][str(fn['pk'])]
if 'fraction' in inputdef_fields:
infile['fraction'] = kwargs['infiles'].get(str(fn.id), {}).get('fr')
infile['fraction'] = kwargs['infiles'].get(str(fn['pk']), {}).get('fr')
if 'instrument' in inputdef_fields:
# No fallback, instrument in header cannot be ''
infile['instrument'] = fn.rawfile.producer.msinstrument.instrumenttype.name
infile['instrument'] = fn['rawfile__producer__msinstrument__instrumenttype__name']
if 'channel' in inputdef_fields:
# For non-pooled labelcheck, cannot be ''
infile['channel'] = fn.rawfile.datasetrawfile.quantfilechannel.channel.channel.name
infile['channel'] = fn['rawfile__datasetrawfile__quantfilechannel__channel__channel__name']
# Dynamic fields
infile.update(kwargs['filefields'].get(str(fn.pk), {}))
infile.update(kwargs['filefields'].get(str(fn['pk']), {}))
infiles.append(infile)
# FIXME bigrun not hardcode, probably need to remove when new infra
shortname = models.UserWorkflow.WFTypeChoices(analysis.nextflowsearch.workflow.wftype).name
Expand Down Expand Up @@ -328,20 +329,20 @@ def process(self, **kwargs):
analysis.save()


class PurgeAnalysis(BaseJob):
class PurgeAnalysis(MultiFileJob):
refname = 'purge_analysis'
task = filetasks.delete_file
"""Queues tasks for deleting files from analysis from disk, then queues
job for directory removal"""

def getfiles_query(self, **kwargs):
return rm.StoredFile.objects.filter(analysisresultfile__analysis__id=kwargs['analysis_id'])
return super().values('path', 'filename', 'servershare', 'servershare__name', 'pk')

def process(self, **kwargs):
webshare = rm.ServerShare.objects.get(name=settings.WEBSHARENAME)
for fn in self.getfiles_query(**kwargs):
fullpath = os.path.join(fn.path, fn.filename)
fullpath = os.path.join(fn['path'], fn['filename'])
print('Purging {} from analysis {}'.format(fullpath, kwargs['analysis_id']))
if fn.servershare_id != webshare.pk:
if fn['servershare'] != webshare.pk:
# Files on web share live locally, deleted by the purge view itself
self.run_tasks.append(((fn.servershare.name, fullpath, fn.id), {}))
self.run_tasks.append(((fn['servershare__name'], fullpath, fn['pk']), {}))
3 changes: 2 additions & 1 deletion src/backend/analysis/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ def refine_mzmls(self, run, params, mzmls, stagefiles, profiles, nf_version):
params.extend(['--mzmldef', os.path.join(rundir, 'mzmldef.txt')])
outfiles = execute_normal_nf(run, params, rundir, gitwfdir, self.request.id, nf_version, profiles)
# TODO ideally do this:
# stage mzML with {dbid}___{filename}.mzML
# ln -s stage mzML with {dbid}___{filename}.mzML
# This keeps dbid / ___ split out of the NF workflow
# We dont necessarily need outfiles, we can read from infiles
outfiles_db = {}
fileurl = urljoin(settings.KANTELEHOST, reverse('jobs:mzmlfiledone'))
outpath = os.path.join(run['outdir'], os.path.split(rundir)[-1])
Expand Down
1 change: 1 addition & 0 deletions src/backend/analysis/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ def test_existing_analysis(self):
'filesamples': {self.f3sfmz.pk: 'setA'},
'fullname': fullname,
'infiles': {self.f3sfmz.pk: 1},
'sf_ids': [self.f3sfmz.pk],
'inputs': {'components': {c_ch.INPUTDEF.name: self.inputdef.value,
c_ch.PREFRAC.name: '.*fr([0-9]+).*mzML$', c_ch.ISOQUANT.name: {},
c_ch.ISOQUANT_SAMPLETABLE.name: [[self.qch.name, 'setA', 'samplename', 'groupname']],
Expand Down
12 changes: 7 additions & 5 deletions src/backend/analysis/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,8 @@ def parse_isoquant(quants):

in_components = {k: v for k, v in req['components'].items() if v}
jobinputs = {'components': wf_components, 'singlefiles': {}, 'multifiles': {}, 'params': {}}
data_args = {'filesamples': {}, 'platenames': {}, 'filefields': defaultdict(dict)}
data_args['infiles'] = req['infiles']
data_args = {'filesamples': {}, 'platenames': {}, 'filefields': defaultdict(dict),
'infiles': req['infiles'], 'sf_ids': [int(x) for x in req['infiles'].keys()]}

# Input file definition
if 'INPUTDEF' in wf_components:
Expand Down Expand Up @@ -1115,9 +1115,11 @@ def purge_analysis(request):
for webfile in rm.StoredFile.objects.filter(analysisresultfile__analysis__id=analysis.pk, servershare_id=webshare.pk):
fpath = os.path.join(settings.WEBSHARE, webfile.path, webfile.filename)
os.unlink(fpath)
jj.create_job('purge_analysis', analysis_id=analysis.id)
jj.create_job('delete_empty_directory',
sf_ids=[x.sfile_id for x in analysis.analysisresultfile_set.all()])
sfiles = rm.StoredFile.objects.filter(analysisresultfile__analysis__id=analysis.pk)
sfiles.update(deleted=True)
sf_ids = [x['pk'] for x in sfiles.values('pk')]
jj.create_job('purge_analysis', analysis_id=analysis.pk, sf_ids=sf_ids)
jj.create_job('delete_empty_directory', sf_ids=sf_ids)
return JsonResponse({})


Expand Down
93 changes: 29 additions & 64 deletions src/backend/datasets/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from analysis.models import Proteowizard, MzmlFile, NextflowWfVersionParamset
from datasets import tasks
from rawstatus import tasks as filetasks
from jobs.jobs import BaseJob, DatasetJob, create_job
from jobs.jobs import DatasetJob, ProjectJob
from rawstatus import jobs as rsjobs


class RenameProject(BaseJob):
class RenameProject(ProjectJob):
'''Uses task that does os.rename on project lvl dir. Needs also to update
dsets / storedfiles to new path post job'''
refname = 'rename_top_lvl_projectdir'
Expand All @@ -22,21 +22,14 @@ class RenameProject(BaseJob):

def getfiles_query(self, **kwargs):
'''Get all files with same path as project_dsets.storage_locs, used to update
path of those files post-job'''
dsets = Dataset.objects.filter(runname__experiment__project_id=kwargs['proj_id'])
return StoredFile.objects.filter(
path of those files post-job. NB this is not ALL sfids in this project, but only
those confirmed to be in correct location'''
dsets = Dataset.objects.filter(runname__experiment__project_id=kwargs['proj_id'],
deleted=False, purged=False)
return StoredFile.objects.filter(deleted=False, purged=False,
servershare__in=[x.storageshare for x in dsets.distinct('storageshare')],
path__in=[x.storage_loc for x in dsets.distinct('storage_loc')])

def get_sf_ids_jobrunner(self, **kwargs):
"""Get all sf ids in project to mark them as not using pre-this-job"""
projfiles = StoredFile.objects.filter(
rawfile__datasetrawfile__dataset__runname__experiment__project_id=kwargs['proj_id'])
dsets = Dataset.objects.filter(runname__experiment__project_id=kwargs['proj_id'])
allfiles = StoredFile.objects.filter(servershare__in=[x.storageshare for x in dsets.distinct('storageshare')],
path__in=[x.storage_loc for x in dsets.distinct('storage_loc')]).union(projfiles)
return [x.pk for x in allfiles]

def process(self, **kwargs):
"""Fetch fresh project name here, then queue for move from there"""
new_is_oldname = True
Expand Down Expand Up @@ -109,6 +102,7 @@ def process(self, **kwargs):
dset_registered_files = DatasetRawFile.objects.filter(
dataset_id=kwargs['dset_id'], rawfile_id__in=kwargs['rawfn_ids'])
if dset_files.count() != dset_registered_files.count():
# DEPRECATE, this should no longer happen?
raise RuntimeError(
'Not all files to move have been transferred or '
'registered as transferred yet, or have non-matching MD5 sums '
Expand Down Expand Up @@ -166,29 +160,21 @@ def process(self, **kwargs):
pwiz = Proteowizard.objects.get(pk=kwargs['pwiz_id'])
res_share = ServerShare.objects.get(pk=kwargs['dstshare_id'])
# First create jobs to delete old files
# TODO problem may arise if eg storage worker is down and hasnt finished processing the
# and old batch of files. Then the new files will come in before the worker is restarted.
# The old files, which will at that point be lying around in their inbox:
# analysis/mzml_in folder, will then be 1.moved, 2.deleted, 3. new file move job will error
nf_raws = []
runpath = f'{dset.id}_convert_mzml_{kwargs["timestamp"]}'
for fn in self.getfiles_query(**kwargs):
mzsf = get_or_create_mzmlentry(fn, pwiz=pwiz, servershare_id=res_share.pk)
mzmlfilename = os.path.splitext(fn.filename)[0] + '.mzML'
mzsf = get_or_create_mzmlentry(fn, pwiz=pwiz, refined=False,
servershare_id=res_share.pk, path=runpath, mzmlfilename=mzmlfilename)
if mzsf.checked and not mzsf.purged:
continue
# refresh file status for previously purged (deleted from disk) mzmls,
# set servershare in case it is not analysis
if mzsf.purged:
mzsf.checked = False
mzsf.purged = False
mzsf.servershare = res_share
mzsf.save()
nf_raws.append((fn.servershare.name, fn.path, fn.filename, mzsf.id))
nf_raws.append((fn.servershare.name, fn.path, fn.filename, mzsf.id, mzmlfilename))
if not nf_raws:
return
# FIXME last file filetype decides mzml input filetype, we should enforce
# same filetype files in a dataset if possible
ftype = mzsf.filetype.name
print('Queuing {} raw files for conversion'.format(len(nf_raws)))
print(f'Queuing {len(nf_raws)} raw files for conversion')
nfwf = NextflowWfVersionParamset.objects.select_related('nfworkflow').get(
pk=pwiz.nf_version_id)
run = {'timestamp': kwargs['timestamp'],
Expand All @@ -198,7 +184,7 @@ def process(self, **kwargs):
'repo': nfwf.nfworkflow.repo,
'nfrundirname': 'small' if len(nf_raws) < 500 else 'larger',
'dstsharename': res_share.name,
'runname': f'{dset.id}_convert_mzml_{kwargs["timestamp"]}',
'runname': runpath,
}
params = ['--container', pwiz.container_version]
for pname in ['options', 'filters']:
Expand All @@ -209,32 +195,6 @@ def process(self, **kwargs):
self.run_tasks.append(((run, params, nf_raws, ftype, nfwf.nfversion, profiles), {'pwiz_id': pwiz.id}))


class ConvertFileMzml(ConvertDatasetMzml):
# FIXME deprecate, no longer using this
refname = 'convert_single_mzml'

def getfiles_query(self, **kwargs):
return StoredFile.objects.select_related('rawfile__datasetrawfile__dataset').filter(pk=kwargs['sf_id'])

def process(self, **kwargs):
queue = kwargs.get('queue', settings.QUEUES_PWIZ[0])
fn = self.getfiles_query(**kwargs).get()
storageloc = fn.rawfile.datasetrawfile.dataset.storage_loc
pwiz = Proteowizard.objects.get(pk=kwargs['pwiz_id'])
mzsf = get_or_create_mzmlentry(fn, pwiz=pwiz)
if mzsf.servershare_id != fn.servershare_id:
# change servershare, in case of bugs the raw sf is set to tmp servershare
# then after it wont be changed when rerunning the job
mzsf.servershare_id = fn.servershare_id
mzsf.save()
if mzsf.checked:
pass
else:
options = ['--{}'.format(x) for x in kwargs.get('options', [])]
filters = [y for x in kwargs.get('filters', []) for y in ['--filter', x]]
self.run_tasks.append(((fn, mzsf, storageloc, options + filters, queue, settings.QUEUES_PWIZOUT[queue]), {}))


class DeleteDatasetMzml(DatasetJob):
"""Removes dataset mzml files from active storage"""
refname = 'delete_mzmls_dataset'
Expand Down Expand Up @@ -296,15 +256,20 @@ class DeleteDatasetPDCBackup(DatasetJob):
# this for e.g empty or active-only dsets


def get_or_create_mzmlentry(fn, pwiz, refined=False, servershare_id=False):
'''This also resets the path of the mzML file'''
if not servershare_id:
servershare_id = fn.servershare_id
mzmlfilename = os.path.splitext(fn.filename)[0] + '.mzML'
mzsf, cr = StoredFile.objects.update_or_create(mzmlfile__pwiz=pwiz, mzmlfile__refined=refined,
rawfile_id=fn.rawfile_id, filetype_id=fn.filetype_id, defaults={
'md5': f'mzml_{fn.rawfile.source_md5[5:]}', 'servershare_id': servershare_id,
'filename': mzmlfilename, 'path': fn.rawfile.datasetrawfile.dataset.storage_loc})
def get_or_create_mzmlentry(fn, pwiz, refined, servershare_id, path, mzmlfilename):
'''This also resets the path of the mzML file in case it's deleted'''
new_md5 = f'mzml_{fn.rawfile.source_md5[5:]}'
mzsf, cr = StoredFile.objects.get_or_create(mzmlfile__pwiz=pwiz, mzmlfile__refined=refined,
rawfile_id=fn.rawfile_id, filetype_id=fn.filetype_id, defaults={'md5': new_md5,
'servershare_id': servershare_id, 'filename': mzmlfilename, 'path': path})
if cr:
MzmlFile.objects.create(sfile=mzsf, pwiz=pwiz, refined=refined)
elif mzsf.purged or not mzsf.checked:
# Any previous mzML files which are deleted or otherwise odd need resetting
mzsf.purged = False
mzsf.checked = False
mzsf.servershare_id = servershare_id
mzsf.path = path
mzsf.md5 = new_md5
mzsf.save()
return mzsf
Loading
Loading