diff --git a/anvio/docs/__init__.py b/anvio/docs/__init__.py index 2d780a4779..67a3e092f6 100644 --- a/anvio/docs/__init__.py +++ b/anvio/docs/__init__.py @@ -90,14 +90,14 @@ "sra-download": { "authors": ['mschecht'], "artifacts_accepted": [], - "artifacts_produced": ['paired-end-fastq'], + "artifacts_produced": ['paired-end-fastq', 'samples-txt'], "anvio_workflows_inherited": [], "third_party_programs_used": [ ('Downloads SRA accessions', ['prefetch']), ('Extracts FASTQ files from SRA accessions', ['fasterq-dump']), ('Compresses FASTQ files in parallel', ['pigz']), ], - "one_sentence_summary": "Download, extract, and gzip paired-end FASTQ files automatically from the NCBI short-read archive (SRA)", + "one_sentence_summary": "Download, MD5 Checksum, extract, and gzip paired-end FASTQ files automatically from the NCBI short-read archive (SRA)", "one_paragraph_summary": ("The sra-download workflow automatizes the process of downloading paired-end FASTQ files " "for a given list of SRA-accessions using [NCBI sra-tools wiki](https://github.com/ncbi/sra-tools/wiki/08.-prefetch-and-fasterq-dump) " "then gzips them using [pigz](https://zlib.net/pigz/).") diff --git a/anvio/workflows/sra_download/Snakefile b/anvio/workflows/sra_download/Snakefile index 696800389a..5ce7f1e863 100644 --- a/anvio/workflows/sra_download/Snakefile +++ b/anvio/workflows/sra_download/Snakefile @@ -1,7 +1,9 @@ # -*- coding: utf-8 import os +import json import anvio import argparse +import pandas as pd from anvio.errors import ConfigError from anvio.workflows.sra_download import SRADownloadWorkflow @@ -69,7 +71,7 @@ rule prefetch: log: os.path.join(dirs_dict['LOGS_DIR'], "{accession}_prefetch.log") input: output: - SRA_TMP = temp(directory(os.path.join(dirs_dict['SRA_prefetch'], "{accession}"))) + SRA_TMP = os.path.join(dirs_dict['SRA_prefetch'], "{accession}", "{accession}.sra") params: SRA_OUTPUT_DIR = os.path.join(dirs_dict['SRA_prefetch']), MAX_SIZE = M.get_param_value_from_config(['prefetch', '--max-size']) @@ -77,6 +79,48 @@ rule prefetch: run: shell("prefetch {wildcards.accession} --output-directory {params.SRA_OUTPUT_DIR} --max-size {params.MAX_SIZE} >> {log} 2>&1") +rule check_md5sum: + """Curl the md5sum file from the SRA FTP site""" + + version: 1.0 + log: os.path.join(dirs_dict['LOGS_DIR'], "{accession}_check_md5sum.log") + input: + sra_file = os.path.join(dirs_dict['SRA_prefetch'], "{accession}", "{accession}.sra") + params: + md5sum = os.path.join(dirs_dict['SRA_prefetch'], "{accession}", "{accession}.json") + output: + md5sum_DONE = touch(os.path.join(dirs_dict['SRA_prefetch'], "{accession}", "{accession}.done")) + threads: M.T('check_md5sum') + run: + # Get the md5sum from the SRA FTP site + shell("curl 'https://locate.ncbi.nlm.nih.gov/sdl/2/retrieve?filetype=run&location-type=forced&location=s3.us-east-1&accept-charges=aws&acc={wildcards.accession}' --output {params.md5sum} >> {log} 2>&1") + + # Get the expected md5sum hash + with open(params.md5sum) as sra_metadata_file: + sra_metadata_dict = json.load(sra_metadata_file) + expected_md5 = sra_metadata_dict['result'][0]['files'][0]['md5'] + + def calculate_md5(file_path): + """Calculate the md5sum of a file""" + import hashlib + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + calculated_md5 = calculate_md5(input.sra_file) + + # Compare expected and calculated md5 and log the result + log_path = str(log) + with open(log_path, 'a') as log_file: + if calculated_md5 == expected_md5: + log_file.write(f"Checksums match: {calculated_md5}\n") + else: + error_message = f"Checksums do not match: calculated {calculated_md5}, expected {expected_md5}" + log_file.write(error_message) + raise ValueError(error_message) + rule fasterq_dump: """Use fasterq-dump to extract FASTQ file(s) from an SRA prefetch *.sra @@ -99,7 +143,7 @@ rule fasterq_dump: version: 1.0 log: os.path.join(dirs_dict['LOGS_DIR'], "{accession}_fasterq_dump.log") - input: ancient(rules.prefetch.output.SRA_TMP) + input: ancient(rules.check_md5sum.output.md5sum_DONE) output: FASTERQDUMP_DONE = touch(os.path.join(dirs_dict['FASTAS'], "{accession}-fasterq-dump.done")), FASTERQDUMP_TEMP = temp(directory("FASTERQDUMP_TEMP/{accession}"))