diff --git a/scripts/config.py b/scripts/config.py index 2d58b4f..f483dfb 100644 --- a/scripts/config.py +++ b/scripts/config.py @@ -1,8 +1,12 @@ import re import json +import traceback +import logging from pathlib import Path from os import access as check_access, R_OK +from os.path import expandvars from socket import gethostname +from uuid import uuid4 from collections import defaultdict @@ -91,13 +95,14 @@ def get_resource_config(): return json.load(open(resource_json)) -def base_config(keys=None, qc=True): +def base_config(keys=None, qc=True, slurm_id=None): base_keys = ('runs', 'run_ids', 'project', 'rnums', 'bcl_files', \ 'sample_sheet', 'samples', 'sids', 'out_to', 'demux_input_dir', \ 'bclconvert', 'demux_data') this_config = {k: [] for k in base_keys} this_config['resources'] = get_resource_config() this_config['runqc'] = qc + this_config['use_scratch'] = True if slurm_id else False if keys: for elem_key in keys: @@ -147,6 +152,29 @@ def get_bigsky_seq_dirs(): return seq_dirs +def get_tmp_dir(host): + TMP_CONFIGS = { + 'skyline': {'user': '/data/scratch/$USER/$SLURM_JOBID', 'global': '/data/scratch/$USER/' + str(uuid4())}, + 'bigsky': {'user': '/gs1/Scratch/$USER/$SLURM_JOBID', 'global': '/gs1/Scratch/$USER/' + str(uuid4())}, + 'biowulf': {'user': '/lscratch/$SLURM_JOBID', 'global': '/tmp/$USER/' + str(uuid4())} + } + + this_tmp = TMP_CONFIGS[host]['user'] + + try: + Path(expandvars(this_tmp)).mkdir(parents=False, exist_ok=True) + except Exception as e: + logging.error("SLURM TMP DIR NOT FOUND: " + traceback.format_exc()) + this_tmp = TMP_CONFIGS[host]['global'] + + try: + Path(expandvars(this_tmp)).mkdir(parents=False, exist_ok=True) + except: + raise OSError(f'TMPDIR does exist and can not be created: "{this_tmp}"') + + return this_tmp + + DIRECTORY_CONFIGS = { "bigsky": { "seqroot": "/gs1/RTS/NextGen/SequencerRuns/", diff --git a/scripts/utils.py b/scripts/utils.py index a1d2917..0d97cd1 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -16,7 +16,8 @@ # ~~~ internals ~~~ from .files import parse_samplesheet, mk_or_pass_dirs -from .config import SNAKEFILE, DIRECTORY_CONFIGS, GENOME_CONFIGS, get_current_server, get_resource_config +from .config import SNAKEFILE, DIRECTORY_CONFIGS, \ + GENOME_CONFIGS, get_current_server, get_resource_config, get_tmp_dir host = get_current_server() @@ -140,10 +141,9 @@ def exec_snakemake(popen_cmd, local=False, dry_run=False, env=None, cwd=None): def mk_sbatch_script(wd, cmd): if not Path(wd, 'logs', 'masterjob').exists(): Path(wd, 'logs', 'masterjob').mkdir(mode=0o755, parents=True) - shebang = "#!/bin/bash --login" if host == 'skyline' else '#!/bin/bash' + tmp_dir = get_tmp_dir(get_current_server()) master_job_script = \ - f""" - {shebang} + f"""#!/bin/bash --login #SBATCH --job-name=weave_masterjob #SBATCH --output={wd}/logs/masterjob/%x_%j.out #SBATCH --error={wd}/logs/masterjob/%x_%j.err @@ -152,8 +152,10 @@ def mk_sbatch_script(wd, cmd): #SBATCH --time=05-00:00:00 #SBATCH --export=ALL #SBATCH --mem=16g + #SBATCH -vvv """.lstrip() master_job_script += get_mods(init=True) + "\n" + master_job_script += f"if [ ! -d \"{tmp_dir}\" ]; then mkdir -p \"{tmp_dir}\"; fi\n" master_job_script += cmd master_job_script = '\n'.join([x.lstrip() for x in master_job_script.split('\n')]) master_script_location = Path(wd, 'logs', 'masterjob', 'master_jobscript.sh').absolute() @@ -186,6 +188,13 @@ def get_mods(init=False): def get_mounts(*extras): mount_binds = [] resources = get_resource_config() + slurm_id = os.environ + + if os.environ.get("SLURM_JOB_ID", None): + tmpdir = get_tmp_dir(get_current_server()) + if not Path(os.path.expandvars(tmpdir)).exists(): + Path(tmpdir).mkdir(parents=True, exist_ok=True) + mount_binds.append(str(tmpdir) + ':/tmp:rw') if resources: for this_mount_label, this_mount_attrs in resources['mounts'].items(): @@ -219,7 +228,7 @@ def get_mounts(*extras): file_to, file_from, mode = str(bind), str(bind), 'rw' mounts.append(file_from + ':' + file_to + ':' + mode) - return "\'-B " + ','.join(mounts) + "\'" + return ','.join(mounts) def exec_pipeline(configs, dry_run=False, local=False): @@ -239,7 +248,7 @@ def exec_pipeline(configs, dry_run=False, local=False): top_config_dirs = [Path(c_dir, '.config').absolute() for c_dir in configs['out_to']] _dirs = top_singularity_dirs + top_config_dirs mk_or_pass_dirs(*_dirs) - skip_config_keys = ('resources', 'runqc') + skip_config_keys = ('resources', 'runqc', 'use_scratch') for i in range(0, len(configs['run_ids'])): this_config = {k: (v[i] if k not in skip_config_keys else v) for k, v in configs.items() if v} @@ -272,7 +281,7 @@ def exec_pipeline(configs, dry_run=False, local=False): "--profile", fastq_demux_profile ] if singularity_binds: - this_cmd.extend(["--singularity-args", singularity_binds]) + this_cmd.extend(["--singularity-args", f"\"--env 'TMPDIR=/tmp' -C -B '{singularity_binds}'\""]) if not local: this_cmd.extend(["--profile", f"{fastq_demux_profile}"]) diff --git a/utils b/utils index 1c34503..54258bf 160000 --- a/utils +++ b/utils @@ -1 +1 @@ -Subproject commit 1c34503ea9e2c607efebad436c5589352729dde5 +Subproject commit 54258bf30456dca44251846c068ac9caa90115e5 diff --git a/weave b/weave index e4d69f5..e3bb12d 100755 --- a/weave +++ b/weave @@ -2,6 +2,7 @@ # -*- coding: UTF-8 -*- import argparse import subprocess +import os from pathlib import Path from scripts import utils, files, config, cache @@ -11,7 +12,7 @@ def run(args): Main frontend for demultiplexing and QA/QC """ runs = files.get_run_directories(args.rundir, seq_dir=args.seq_dir, sheetname=args.sheetname) - exec_config = config.base_config(qc=args.noqc) + exec_config = config.base_config(qc=args.noqc, slurm_id=os.environ.get("SLURM_JOB_ID", None)) for (rundir, run_infos) in runs: sample_sheet = run_infos['samplesheet'] diff --git a/workflow/qc.smk b/workflow/qc.smk index 11632bb..50f35c6 100644 --- a/workflow/qc.smk +++ b/workflow/qc.smk @@ -26,7 +26,8 @@ rule fastqc_untrimmed: log: config['out_to'] + "/logs/" + config["project"] + "/fastqc_untrimmed/{sids}_R{rnums}.log" threads: 4 containerized: config["resources"]["sif"] + "weave_ngsqc_0.0.2.sif" - resources: mem_mb = 8096 + resources: + mem_mb = 8096 shell: """ mkdir -p {params.output_dir} @@ -41,15 +42,22 @@ rule fastqc_trimmed: html = config['out_to'] + "/" + config["project"] + "/{sids}/fastqc_trimmed/{sids}_trimmed_R{rnums}_fastqc.html", fqreport = config['out_to'] + "/" + config["project"] + "/{sids}/fastqc_trimmed/{sids}_trimmed_R{rnums}_fastqc.zip", params: - output_dir = lambda w: config['out_to'] + "/" + config["project"] + "/" + w.sids + "/fastqc_trimmed/" + output_dir = lambda w: config['out_to'] + "/" + config["project"] + "/" + w.sids + "/fastqc_trimmed/", + tmp_dir = lambda w: "/tmp/" + w.sids, containerized: config["resources"]["sif"] + "weave_ngsqc_0.0.2.sif" threads: 4 - resources: mem_mb = 8096 + resources: + mem_mb = 8096, + disk_mb = int(500e3) if config['use_scratch'] else 0, log: config['out_to'] + "/logs/" + config["project"] + "/fastqc_trimmed/{sids}_R{rnums}.log" shell: """ + if [ ! -d "{params.tmp_dir}" ]; then mkdir -p "{params.tmp_dir}"; fi + tmp=$(mktemp -d -p "{params.tmp_dir}") mkdir -p {params.output_dir} - fastqc -o {params.output_dir} -t {threads} {input.in_read} + fastqc -o {params.tmp_dir} -t {threads} {input.in_read} + find "{params.tmp_dir}" -type f \\( -name '*.html' -o -name '*.zip' \\) \\-exec cp {{}} {params.output_dir} \\; + rm -rf {params.tmp_dir}/* """