Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More automatic raw file production treatment #109

Merged
merged 16 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion data/test/newstorage/p1/e1/dtype1/run1/raw3.raw

This file was deleted.

Binary file not shown.
2 changes: 1 addition & 1 deletion deploy/default_inventory/group_vars/all
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
kantele_repo: https://github.com/glormph/kantele
kantele_repo: https://github.com/lehtiolab/kantele
qc_queue: qc_nextflow
analysis_queue: nextflow
mq_vhost: kantele_vhost
Expand Down
48 changes: 24 additions & 24 deletions src/backend/analysis/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def check_ensembl_uniprot_fasta_download(self, dbname, version, organism, dbtype
if regresp:
fa_data.update({'desc': desc, 'dbtype': dbtype})
transfer_resultfile(dstpath, settings.LIBRARY_FILE_PATH, wfp.name,
settings.PRIMARY_STORAGESHARENAME, regresp, doneurl, token, self.request.id,
is_fasta=fa_data)
settings.PRIMARY_STORAGESHARENAME, doneurl, token, self.request.id,
regresp['fnid'], regresp['md5'], regresp['newname'], is_fasta=fa_data)
else:
print('File was already downloaded')

Expand All @@ -71,8 +71,8 @@ def check_ensembl_uniprot_fasta_download(self, dbname, version, organism, dbtype
if regresp:
fa_data['desc'] = desc
transfer_resultfile(dstpath, settings.LIBRARY_FILE_PATH, wfp.name,
settings.PRIMARY_STORAGESHARENAME, regresp, doneurl, token,
self.request.id, is_fasta=fa_data)
settings.PRIMARY_STORAGESHARENAME, doneurl, token, self.request.id,
regresp['fnid'], regresp['md5'], regresp['newname'], is_fasta=fa_data)
else:
print('File was already downloaded')
task_finished(self.request.id)
Expand Down Expand Up @@ -192,8 +192,9 @@ def run_nextflow_workflow(self, run, params, stagefiles, profiles, nf_version):
regfile = register_resultfile(os.path.basename(ofile), ofile, token)
if not regfile:
continue
transfer_resultfile(outdir, outpath, ofile, run['dstsharename'], regfile, fileurl, token,
self.request.id, analysis_id=run['analysis_id'], checksrvurl=checksrvurl)
transfer_resultfile(outdir, outpath, ofile, run['dstsharename'], fileurl, token,
self.request.id, regfile['fnid'], regfile['md5'], regfile['newname'],
analysis_id=run['analysis_id'], checksrvurl=checksrvurl)
report_finished_run(reporturl, postdata, stagedir_to_rm, rundir, run['analysis_id'])
return run

Expand Down Expand Up @@ -255,7 +256,7 @@ def refine_mzmls(self, run, params, mzmls, stagefiles, profiles, nf_version):
sf_id, newname = os.path.basename(fn).split('___')
fdata = {'file_id': sf_id, 'newname': newname, 'md5': calc_md5(fn)}
transfer_resultfile(outfullpath, run['runname'], fn, run['dstsharename'], fdata, fileurl, token,
self.request.id)
self.request.id, fdata['fnid'], fdata['md5'], fdata['newname'])
reporturl = urljoin(settings.KANTELEHOST, reverse('jobs:analysisdone'))
postdata = {'client_id': settings.APIKEY, 'analysis_id': run['analysis_id'],
'task': self.request.id, 'name': run['runname'], 'user': run['user'], 'state': 'ok'}
Expand Down Expand Up @@ -483,7 +484,7 @@ def check_in_transfer_client(task_id, token, filetype):


def register_resultfile(fname, fpath, token):
reg_url = urljoin(settings.KANTELEHOST, reverse('files:register'))
reg_url = urljoin(settings.KANTELEHOST, reverse('files:reg_trfstate'))
postdata = {'fn': fname,
'client_id': settings.APIKEY,
'md5': calc_md5(fpath),
Expand All @@ -493,51 +494,50 @@ def register_resultfile(fname, fpath, token):
'token': token,
}
resp = requests.post(url=reg_url, json=postdata)
if resp.status != 500:
rj = resp.json()
else:
rj = False
resp.raise_for_status()
rj = resp.json()
if not rj['stored']:
outfile = resp.json()
outfile.update({'newname': fname, 'md5': postdata['md5']})
return outfile
if rj['transferstate'] == 'transfer':
return {'fnid': rj['fn_id'], 'newname': fname, 'md5': postdata['md5']}
else:
return False


def transfer_resultfile(outfullpath, outpath, fn, dstsharename, regfile, url, token, task_id,
analysis_id=False, checksrvurl=False, is_fasta=False):
def transfer_resultfile(outfullpath, outpath, fn, dstsharename, url, token, task_id,
fn_id, reg_md5, newname, analysis_id=False, checksrvurl=False, is_fasta=False):
'''Copies files from analyses to outdir on result storage.
outfullpath is absolute destination dir for file
outpath is the path stored in Kantele DB (for users on the share of outfullpath)
fn is absolute path to src file
regfile is dict of registered file with md5, newname, fn_id
'''
fname = regfile['newname']
dst = os.path.join(outfullpath, fname)
dst = os.path.join(outfullpath, newname)
try:
shutil.copy(fn, dst)
except:
taskfail_update_db(task_id, 'Errored when trying to copy files to analysis result destination')
raise
os.chmod(dst, 0o640)
postdata = {'client_id': settings.APIKEY, 'fn_id': regfile['file_id'],
'outdir': outpath, 'filename': fname, 'token': token, 'dstsharename': dstsharename,
postdata = {'client_id': settings.APIKEY, 'fn_id': fn_id, 'outdir': outpath,
'filename': newname, 'token': token, 'dstsharename': dstsharename,
'analysis_id': analysis_id, 'is_fasta': is_fasta}
if calc_md5(dst) != regfile['md5']:
if calc_md5(dst) != reg_md5:
msg = 'Copying error, MD5 of src and dst are different'
taskfail_update_db(task_id, msg)
raise RuntimeError(msg)
else:
postdata['md5'] = regfile['md5']
postdata['md5'] = reg_md5
if analysis_id:
# Not for refine mzMLs
postdata.update({'ftype': settings.ANALYSIS_FT_NAME, 'analysis_id': analysis_id})
# first check if upload file is OK:
resp = requests.post(checksrvurl, json={'fname': fname, 'client_id': settings.APIKEY})
resp = requests.post(checksrvurl, json={'fname': newname, 'client_id': settings.APIKEY})
if resp.status_code == 200:
# Servable file found, upload also to web server
# Somewhat complex POST to get JSON and files in same request
response = requests.post(url, files={
'ana_file': (fname, open(fn, 'rb'), 'application/octet-stream'),
'ana_file': (newname, open(fn, 'rb'), 'application/octet-stream'),
'json': (None, json.dumps(postdata), 'application/json')})
else:
response = update_db(url, files={
Expand Down
13 changes: 13 additions & 0 deletions src/backend/analysis/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
class AnalysisTest(BaseTest):
def setUp(self):
super().setUp()
self.usrfraw = rm.RawFile.objects.create(name='usrfiledone', producer=self.prod,
source_md5='usrfmd5', size=100, claimed=True, date=timezone.now())
self.sfusr, _ = rm.StoredFile.objects.update_or_create(rawfile=self.usrfraw,
md5=self.usrfraw.source_md5, filetype=self.uft,
defaults={'filename': self.usrfraw.name, 'servershare': self.sstmp,
'path': '', 'checked': True})
self.usedtoken, _ = rm.UploadToken.objects.update_or_create(user=self.user, token='usrffailtoken',
expired=False, producer=self.prod, filetype=self.uft,
uploadtype=rm.UploadToken.UploadFileType.USERFILE, defaults={
'expires': timezone.now() + timedelta(1)})
self.userfile, _ = rm.UserFile.objects.get_or_create(sfile=self.sfusr,
description='This is a userfile', upload=self.usedtoken)

self.pset = am.ParameterSet.objects.create(name='ps1')
self.param1 = am.Param.objects.create(name='a flag', nfparam='--flag', ptype=am.Param.PTypes.FLAG, help='flag help')
self.param2 = am.Param.objects.create(name='a chbox', nfparam='--multi', ptype=am.Param.PTypes.MULTI, help='help')
Expand Down
3 changes: 1 addition & 2 deletions src/backend/datasets/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ def run_convert_mzml_nf(self, run, params, raws, ftype_name, nf_version, profile
for raw in raws:
token = check_in_transfer_client(self.request.id, token, ftype_name)
srcpath = os.path.join(run_outdir, raw[4])
fdata = {'md5': calc_md5(srcpath), 'file_id': raw[3], 'newname': raw[4]}
transfer_resultfile(outfullpath, outpath, srcpath, run['dstsharename'],
fdata, transfer_url, token, self.request.id)
raw[3], calc_md5(srcpath), raw[4], transfer_url, token, self.request.id)
# FIXME first check tstate so no dup transfers used?
# TODO we're only reporting task finished in this POST call, but there is no specific route
# for that.
Expand Down
82 changes: 78 additions & 4 deletions src/backend/datasets/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,80 @@ def test_dset_is_filename_job_error(self):
self.assertEqual(self.tmpsf.path, '')


class AcceptRejectPreassocFiles(BaseIntegrationTest):
url = '/datasets/save/files/pending/'

def setUp(self):
super().setUp()
self.tmpraw.claimed = True
self.tmpraw.save()
jm.Job.objects.create(funcname='move_files_storage', state=Jobstates.HOLD, kwargs={
'dset_id': self.ds.pk, 'rawfn_ids': [self.tmpraw.pk]}, timestamp=timezone.now())

def test_accept_all_files(self):
newdsr = dm.DatasetRawFile.objects.filter(dataset=self.ds, rawfile=self.tmpraw)
self.assertEqual(newdsr.count(), 0)
resp = self.post_json({'dataset_id': self.ds.pk, 'accepted_files': [self.tmpraw.pk],
'rejected_files': []})
self.assertEqual(resp.status_code, 200)
self.assertEqual(newdsr.count(), 1)
self.assertEqual(self.tmpsf.servershare, self.sstmp)
self.assertEqual(self.tmpsf.path, '')
self.tmpraw.refresh_from_db()
self.assertTrue(self.tmpraw.claimed)
self.run_job()
self.tmpsf.refresh_from_db()
self.assertEqual(self.tmpsf.servershare, self.ssnewstore)
self.assertEqual(self.tmpsf.path, self.ds.storage_loc)
self.assertTrue(os.path.exists(os.path.join(settings.SHAREMAP[self.ssnewstore.name],
self.ds.storage_loc, self.tmpsf.filename)))

def test_reject_all_files(self):
newdsr = dm.DatasetRawFile.objects.filter(dataset=self.ds, rawfile=self.tmpraw)
self.assertEqual(newdsr.count(), 0)
resp = self.post_json({'dataset_id': self.ds.pk, 'rejected_files': [self.tmpraw.pk],
'accepted_files': []})
self.assertEqual(resp.status_code, 200)
newdsr = dm.DatasetRawFile.objects.filter(dataset=self.ds, rawfile=self.tmpraw)
self.assertEqual(newdsr.count(), 0)
self.assertEqual(self.tmpsf.servershare, self.sstmp)
self.assertEqual(self.tmpsf.path, '')
self.tmpraw.refresh_from_db()
self.assertFalse(self.tmpraw.claimed)
self.run_job()
self.tmpsf.refresh_from_db()
self.assertEqual(self.tmpsf.servershare, self.sstmp)
self.assertEqual(self.tmpsf.path, '')
self.assertTrue(os.path.exists(os.path.join(settings.SHAREMAP[self.sstmp.name],
self.tmpsf.filename)))


def test_accept_some_files(self):
rejectraw = rm.RawFile.objects.create(name='reject.raw', producer=self.prod,
source_md5='rejectit_fakemd5', size=123, date=timezone.now(),
claimed=True)
jm.Job.objects.create(funcname='move_files_storage', state=Jobstates.HOLD, kwargs={
'dset_id': self.ds.pk, 'rawfn_ids': [rejectraw.pk]}, timestamp=timezone.now())
newdsr = dm.DatasetRawFile.objects.filter(dataset=self.ds, rawfile=self.tmpraw)
self.assertEqual(newdsr.count(), 0)
resp = self.post_json({'dataset_id': self.ds.pk, 'accepted_files': [self.tmpraw.pk],
'rejected_files': [rejectraw.pk]})
self.assertEqual(resp.status_code, 200)
self.assertEqual(newdsr.count(), 1)
rejectdsr = dm.DatasetRawFile.objects.filter(dataset=self.ds, rawfile=rejectraw)
self.assertEqual(rejectdsr.count(), 0)
self.assertEqual(self.tmpsf.servershare, self.sstmp)
self.assertEqual(self.tmpsf.path, '')
self.tmpraw.refresh_from_db()
self.assertTrue(self.tmpraw.claimed)
self.run_job()
self.tmpsf.refresh_from_db()
self.assertEqual(self.tmpsf.servershare, self.ssnewstore)
self.assertEqual(self.tmpsf.path, self.ds.storage_loc)
self.assertTrue(os.path.exists(os.path.join(settings.SHAREMAP[self.ssnewstore.name],
self.ds.storage_loc, self.tmpsf.filename)))


class RenameProjectTest(BaseIntegrationTest):
url = '/datasets/rename/project/'

Expand Down Expand Up @@ -827,7 +901,9 @@ class TestDeleteDataset(ProcessJobTest):
jobclass = dj.DeleteActiveDataset

def test_files(self):
# Delete both raw and mzML file
# Delete both raw and mzML file, pretend they are files
self.ft.is_folder = False
self.ft.save()
kwargs = {'dset_id': self.ds.pk}
self.job.process(**kwargs)
exp_t = [
Expand All @@ -840,13 +916,11 @@ def test_files(self):

def test_is_dir(self):
# Delete both raw and mzML file, where raw is a folder
self.ft.is_folder = True
self.ft.save()
kwargs = {'dset_id': self.ds.pk}
self.job.process(**kwargs)
exp_t = [
((self.f3sf.servershare.name, os.path.join(self.f3sf.path, self.f3sf.filename),
self.f3sf.pk, True), {}),
self.f3sf.pk, self.f3sf.filetype.is_folder), {}),
((self.f3sfmz.servershare.name, os.path.join(self.f3sfmz.path, self.f3sfmz.filename),
self.f3sfmz.pk, False), {})
]
Expand Down
1 change: 1 addition & 0 deletions src/backend/datasets/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
path('show/species/', views.get_species),
path('save/dataset/', views.save_dataset, name="savedset"),
path('save/files/', views.save_files, name="savefiles"),
path('save/files/pending/', views.accept_or_reject_dset_preassoc_files),
path('save/mssamples/', views.save_mssamples),
path('save/samples/', views.save_samples),
path('save/labelcheck/', views.save_labelcheck),
Expand Down
45 changes: 45 additions & 0 deletions src/backend/datasets/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,17 @@ def dataset_files(request, dataset_id=False):
return HttpResponseNotFound()
ds_files = models.DatasetRawFile.objects.select_related(
'rawfile__producer').filter(dataset_id=dataset_id).order_by('rawfile__date')
if penfileq := jm.Job.objects.filter(funcname='move_files_storage', kwargs__dset_id=dataset_id,
state=jj.Jobstates.HOLD):
pending_ids = [x for y in penfileq for x in y.kwargs['rawfn_ids']]
print(penfileq, pending_ids)
pending_files = [(x['pk'], x['name']) for x in
filemodels.RawFile.objects.filter(pk__in=pending_ids).values('name', 'pk')]
else:
pending_files = []
response_json.update({
'dsfn_order': [x.rawfile_id for x in ds_files],
'pendingFiles': pending_files,
'datasetFiles':
{x.rawfile_id:
{'id': x.rawfile_id, 'name': x.rawfile.name, 'associd': x.id,
Expand Down Expand Up @@ -1284,6 +1293,42 @@ def save_or_update_files(data):
return {'error': False}, 200


@login_required
def accept_or_reject_dset_preassoc_files(request):
data = json.loads(request.body.decode('utf-8'))
user_denied = check_save_permission(data['dataset_id'], request.user)
if user_denied:
return user_denied
# First remove jobs on rejected files and un-claim rawfiles
if len(data['rejected_files']):
deleted = jm.Job.objects.filter(funcname='move_files_storage', state=jj.Jobstates.HOLD,
kwargs__dset_id=data['dataset_id'],
kwargs__rawfn_ids__in=[[x] for x in data['rejected_files']]).delete()
if deleted:
filemodels.RawFile.objects.filter(pk__in=data['rejected_files']).update(claimed=False)
# Now start jobs for accepted files
if len(data['accepted_files']):
jm.Job.objects.filter(funcname='move_files_storage', state=jj.Jobstates.HOLD,
kwargs__dset_id=data['dataset_id'],
kwargs__rawfn_ids__in=[[x] for x in data['accepted_files']]).update(
state=jj.Jobstates.PENDING)
models.DatasetRawFile.objects.bulk_create([
models.DatasetRawFile(dataset_id=data['dataset_id'], rawfile_id=fnid)
for fnid in data['accepted_files']])
# If files changed and labelfree, set sampleprep component status
# to not good. Which should update the tab colour (green to red)
try:
qtype = models.Dataset.objects.select_related(
'quantdataset__quanttype').get(pk=data['dataset_id']).quantdataset.quanttype
except models.QuantDataset.DoesNotExist:
pass
else:
set_component_state(data['dataset_id'], models.DatasetUIComponent.SAMPLES,
models.DCStates.INCOMPLETE)
return JsonResponse({'error': False})
# FIXME switch fileserver should happen - in that case on the classify start!


@login_required
def save_files(request):
"""Updates and saves files"""
Expand Down
14 changes: 7 additions & 7 deletions src/backend/home/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ def setUp(self):
self.ssmzml, _ = rm.ServerShare.objects.get_or_create(name=settings.MZMLINSHARENAME,
server=self.newfserver, share='/home/mzmls')
self.ft, _ = rm.StoredFileType.objects.get_or_create(name='Thermo raw', filetype='raw')
self.prodqe, _ = rm.Producer.objects.get_or_create(name='qe_prod', client_id='abcdefg',
shortname='p1')
self.prodtims, _ = rm.Producer.objects.get_or_create(name='tims_prod', client_id='hijklm',
self.prodqe = rm.Producer.objects.create(name='qe_prod', client_id='qe_abcdefg',
shortname='qep1')
self.prodtims = rm.Producer.objects.create(name='tims_prod', client_id='hijklm',
shortname='p2')
self.tims, _ = rm.MSInstrumentType.objects.get_or_create(name='timstof')
self.qe, _ = rm.MSInstrumentType.objects.get_or_create(name='qe')
instqe, _ = rm.MSInstrument.objects.get_or_create(producer=self.prodqe,
self.tims = rm.MSInstrumentType.objects.create(name='timstof')
self.qe = rm.MSInstrumentType.objects.create(name='qe')
instqe = rm.MSInstrument.objects.create(producer=self.prodqe,
instrumenttype=self.qe, filetype=self.ft)
insttims, _ = rm.MSInstrument.objects.get_or_create(producer=self.prodtims,
insttims = rm.MSInstrument.objects.create(producer=self.prodtims,
instrumenttype=self.tims, filetype=self.ft)
own1, _ = dm.DatasetOwner.objects.get_or_create(dataset=self.ds, user=self.user)
self.run = dm.RunName.objects.create(name=self.id(), experiment=self.exp1)
Expand Down
Loading