-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFreyja_parallelization.py
283 lines (253 loc) · 16.2 KB
/
Freyja_parallelization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
#!/bin/python3.6
#Version
#Freyja_parallelization_V6.1
#Script for analysis of FASTQ files via Freyja
import sys
import time
import multiprocessing as mp
import os
from datetime import date
from datetime import timedelta
import argparse
from argparse import ArgumentParser
#Updates in Version 4
#Automatically remove all *sam files as they become redundant after converting to bam
#Adds in the '-update' or '-fupdate' to decide whether the user would like to update the barcodes
#Updates in Version 5
# Convert iVar output to a snpEff annotated VCF
# Changed the way samples are handled in parallel, can now be modified with option -n (default 8)
# Cleaned up parameters with argparse
# Integrated File Checking
#Update in Version 6
# Added parallel file-checking
# Pipe sam files into samtools
# Fixed repository link for old Artic primers ( < V4)
#Time feedback
start_time = time.time()
#A function that execute the recurring step of this pipeline
def run_core_iPMVC(wp_path,current_sample, nb_t_profiling, smpl_path, usherbarcodes, output, analyzed_list, primerbed):
# Quality Control: trimming, automatic adapters removal and low complexity reads removal
os.system("fastp -i {2}{0}_R1_001.fastq.gz -I {2}{0}_R2_001.fastq.gz -o {3}{0}_R1_001_trimmed_1.fastq.gz -O {3}{0}_R2_001_trimmed_2.fastq.gz -l 70 -x --cut_tail --cut_tail_mean_quality 20 --detect_adapter_for_pe --thread {1} --json {3}{0}.fastp.json".format(current_sample, nb_t_profiling, smpl_path, output))
#Decontaminate human reads
#bwa mem Workspace/Homo_sapiens.GRCh38.fa output2/sample16_L00M_R1_001_trimmed_1.fastq.gz output2/sample16_L00M_R2_001_trimmed_2.fastq.gz -t 4 | samtools view -bS - | samtools view -f 12 -F 256 --threads 4 - | samtools sort -n -m 5G --threads 4 - | tee test_sorted.bam | samtools fastq -@ 4 - -1 test.fastq.gz -2 test2.fastq.gz
os.system("bwa mem {3}Homo_sapiens.GRCh38.fa {0}{1}_R1_001_trimmed_1.fastq.gz {0}{1}_R2_001_trimmed_2.fastq.gz -t {2} | \
samtools view -bS --threads {2} - | \
samtools view -bS --threads {2} -f 12 -F 256 - | \
samtools sort -n -m 5G --threads {2} -o {0}{1}_human_decon_sorted.bam - \
".format(output, current_sample, nb_t_profiling, wp_path))
os.system("samtools fastq -@ {2} {0}{1}_human_decon_sorted.bam -1 {0}{1}_R1_001_decon_1.fastq.gz -2 {0}{1}_R2_001_decon_2.fastq.gz".format(output, current_sample, nb_t_profiling))
#Align reads to reference genome
os.system("bwa mem {0}MN908947_3.fa {3}{1}_R1_001_decon_1.fastq.gz {3}{1}_R2_001_decon_2.fastq.gz -t {2} | \
samtools view -bS --threads {2} - | \
samtools sort -m 5G --threads {2} -o {3}{1}_preprocessed_sorted.bam - ".format(wp_path, current_sample, nb_t_profiling, output))
#In this version I am making a big change with -m option and setting minimum length to 70
#We also now include reads with no primers due to Nextera library prep
os.system("ivar trim -e -m 70 -i {2}{1}_preprocessed_sorted.bam -b {3} -p {2}{1}_ivartrim".format(wp_path, current_sample,output, primerbed))
os.system("samtools sort -o {0}{1}_ivartrim_sorted.bam {0}{1}_ivartrim.bam".format(output, current_sample))
os.system("samtools index {0}{1}_ivartrim_sorted.bam".format(output, current_sample))
# Freyja commnad
os.system("freyja variants {2}{1}_ivartrim_sorted.bam --variants {2}{1}_variantout --depths {2}{1}_depthout --ref {0}MN908947_3.fa".format(wp_path, current_sample, output))
os.system("freyja demix {3}{1}_variantout.tsv {3}{1}_depthout --barcodes {0}{2} --output {3}results/{1}_output".format(wp_path, current_sample, usherbarcodes, output))
# Annotate the iVar variant file
#Convert to VCF
os.system("python3 {0}ivar_variants_to_vcf.py --pass_only {1}{2}_variantout.tsv {1}{2}_variantout.vcf".format(wp_path, output, current_sample))
#Annotate with snpEff
os.system("java -Xmx8g -jar /opt/snpEff/snpEff.jar MN908947.3 {0}{1}_variantout.vcf > {0}{1}_variantout.snpEff.annotated.vcf".format(output, current_sample))
#Make a text readible output file
os.system("python3 {0}convert_recurrent_snpeff_data.py -f {1}{2}_variantout.snpEff.annotated.vcf -s {2} -o {1}{2}_snpEff_converted".format(wp_path, output, current_sample))
if os.path.exists("{1}results/{0}_output".format(current_sample,output)):
#glob.glob("date | date '+%F' >> {1}results/{0}_output".format(current_sample, output))
print("echo {1} >> {0}{2}".format(wp_path, current_sample,analyzed_list))
os.system("echo {1} >> {0}{2}".format(wp_path, current_sample, analyzed_list))
#create a function that test if a string is numeric
def is_numeric(the_str):
try:
float_conv = float(the_str)
return True
except (ValueError, TypeError):
return False
if __name__ == "__main__":
#Initializing parameters and options
parser = argparse.ArgumentParser(prog= 'Wastewater bioinformatics pipeline',
description= "This is a script that takes FASTQ files from tiled amplicon-sequencing of wastewater and then runs Freyja to assign lineage/relative abundance measurements, as well as identify mutations in the sample")
parser.add_argument("workspace_path",
help= "The path to directory where: reference files, databases, sample list, usherbarcodes are.") #Required
parser.add_argument("samples_list_file",
help = "Text file with samples to be run. Used to identify input and output files. The pattern is <sample_name>_R[1-2]_001.fastq.gz") #Required
parser.add_argument("-s", "--sample_path", default = '.',
help = "Default: Current working directory.") #Optional
parser.add_argument("-t", "--threads", default = 4,
help= "Default: 4 Threads available should be > threads multiplied by samples run in parallel") #Optional
parser.add_argument("-b", "--barcode",
help = "Default: Will download/build most recent. Format: usher_barcodes_yyyy-mm-dd.csv") #Optional
parser.add_argument("-u", "--update", action="store_true",
help = "Will update barcodes the latest barcodes") #Optional
parser.add_argument("-o", "--output", default= '.',
help="Default: Current working directory. Will make directory specified if not found") #Optional
parser.add_argument("-n", "--parallel", default = 4,
help = "Default: 4 Number of samples run in parallel.") #Optional
parser.add_argument("-f", "--file_check", action="store_true",
help = "Option generates a file checking file/figure to confirm if everything was created") #Optional
parser.add_argument("-d", "--date",
help = "If you want to download a specific usher barcode date. Put date in yyyy-mm-dd") #Optional
parser.add_argument("-V", "--arctic", default = "V4.1",
help = "Specify the Arctic primer scheme used: V1, V2, V3, V4, V4.1") #Optional
parser.add_argument("-a", "--analyzed", default="Analyzed_samples.txt",
help = "Output file for listing samples that have been completed") #Optional
args = parser.parse_args()
#### ARGUMENTS && MISC FILE CHECKING ####
#Workspace (Path) [required]
#Workspace is the directory where all files should be located (references, databases, scripts, and barcodes)
#This is the default location of all the files unless stated
workspace_path = args.workspace_path #commandline argument
#Ensure "/" at the end of the workspace absolute path
if ((workspace_path[len(workspace_path)-1]) != "/"):
workspace_path = "{0}/".format(workspace_path)
if not os.path.isdir(workspace_path):
print("Workspace directory does not exist")
raise SystemExit(1)
print("Chosen Workspace is : '{0}'".format(workspace_path))
#FASTQ file directory of samples [optional]: Default current directory
#If sequence data is stored in a different location
sample_path = args.sample_path
#Ensure "/" at the end of the workspace absolute path, and check that path specified exists
if ((sample_path[len(sample_path)-1]) != "/"):
sample_path = "{0}/".format(sample_path)
if not os.path.isdir(sample_path):
print("Sample directory does not exist")
raise SystemExit(1)
print("Sample Path is : '{0}'".format(sample_path))
#Sample list file should be a simple text file with each sample on a new line and located in the workspace path
#We are only using paired reads at this point so there should be a forward and reverse read follow the pattern
# sample + _R2_001.fastq.gz and sample + _R1_001.fastq.gz
samples_list_file = args.samples_list_file
print("Looking for sample list and paired read FASTQ files")
if not os.path.exists("{0}{1}".format(workspace_path,samples_list_file)):
print("Sample list file could not be found in chosen workspace directory " + workspace_path)
raise SystemExit(1)
lst_samples=[]
with open("{0}{1}".format(workspace_path,samples_list_file)) as f:
lst_samples = f.readlines()
the_idx = 0
for sample_name in lst_samples:
lst_samples[the_idx] = sample_name.replace("\n", "")
the_idx = the_idx + 1
num_samples = len(lst_samples)
for sample_name in lst_samples:
if not os.path.isfile("{0}{1}_R1_001.fastq.gz".format(sample_path,sample_name)):
print("{0}{1}_R1_001.fastq.gz Forward read Fastq file could be found. Make sure files are in the specified directory using options -s or --sample_path option. Also that forward reads have the format sample + _R1_001.fastq.gz".format(sample_path,sample_name))
raise SystemExit(1)
if not os.path.isfile("{0}{1}_R2_001.fastq.gz".format(sample_path,sample_name)):
print("{0}{1}_R1_001.fastq.gz Reverse read Fastq file could be found. Only handles paired reads at the moment.".format(sample_path,sample_name))
raise SystemExit(1)
#Will check if barcode is up to date and inform user wants to update the barcodes either with 1) freyja update or 2) no-update
usherbarcodes = args.barcode
today = date.today()
down_date = today - timedelta(days = 1) #ushebarcode dates are always one day old
down_date = "usher_barcodes_{0}.csv".format(down_date)
if not(args.barcode) or args.update or args.date:
if args.update:
print("Using usher barcode update pulling the most recent usher barcodes" + str(down_date))
usherbarcodes = down_date
os.system("python3 {0}/usher_update.py {0}".format(workspace_path))
elif args.date:
down_date2 = "usher_barcodes_{0}.csv".format(args.date)
print("Downloading the specified barcodes" + str(down_date2) + "Reminder: Date should be formatted yyyy-mm-dd")
usherbarcodes = down_date2
os.system("python3 {0}/usher_update.py {0} -d {1}".format(workspace_path, args.date))
elif not(args.barcode):
print("No barcode information provided Downloading most recent barcodes" + str(down_date))
usherbarcodes = down_date
os.system("python3 {0}/usher_update.py {0}".format(workspace_path))
if usherbarcodes != down_date:
print("Warning: Expected to use usher barcode " + down_date + " To run using the most recent barcode run using --update")
print("Barcodes used in analysis: {0}".format(usherbarcodes))
#Path for directory output of pipeline
output = args.output
if not(args.output):
output = os.getcwd()
#Making sure to have "/" at the end of the workspace absolute path
if ((output[len(output)-1]) != "/"):
output = "{0}/".format(output)
print("Output Path is : '{0}'".format(output))
#Make directory output if does not exist
if not os.path.exists(output + "results"):
os.system("mkdir -p {0}results/".format(output))
#Making sure that Human refrence genome BWA index is downloaded
if not os.path.exists("{0}Homo_sapiens.GRCh38.fa.bwt".format(workspace_path)):
print("Downloading reference Human genome and building index. Will take a long-time. I suggest downloading precompiled version")
os.system("wget --quiet -O {0}Homo_sapiens.GRCh38.dna.alt.fa.gz http://ftp.ensembl.org/pub/release-107/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.alt.fa.gz".format(workspace_path))
os.system("gzip -d {0}Homo_sapiens.GRCh38.dna.alt.fa.gz".format(workspace_path))
os.system("mv {0}Homo_sapiens.GRCh38.dna.alt.fa {0}Homo_sapiens.GRCh38.fa".format(workspace_path))
os.system("bwa index -a bwtsw -p {0}Homo_sapiens.GRCh38.fa {0}Homo_sapiens.GRCh38.fa".format(workspace_path))
#Making sure that Wuhan reference SARS genome BWA index is downloaded
#This version changes reference fasta to one on arctic network and changes chromosome name
if not os.path.exists("{0}MN908947_3.fa.bwt".format(workspace_path)):
os.system("wget --quiet -O {0}MN908947_3.fa https://raw.githubusercontent.com/artic-network/primer-schemes/master/nCoV-2019/V4.1/SARS-CoV-2.reference.fasta".format(workspace_path))
os.system("bwa index -p {0}MN908947_3.fa {0}MN908947_3.fa".format(workspace_path))
#Making that Artic primer bed file is downloaded
if not os.path.exists("{0}SARS-CoV-2.{1}.primer.bed".format(workspace_path, args.arctic)):
print("Downloading {0} primer-scheme".format(args.arctic))
if args.arctic in ["V1", "V2", "V3"]:
os.system("wget -O {0}SARS-CoV-2.{1}.primer.bed https://raw.githubusercontent.com/artic-network/primer-schemes/master/nCoV-2019/{1}/nCoV-2019.primer.bed".format(workspace_path, args.arctic))
elif args.arctic in ["V4", "V4.1"]:
os.system("wget -O {0}SARS-CoV-2.{1}.primer.bed https://raw.githubusercontent.com/artic-network/primer-schemes/master/nCoV-2019/{1}/SARS-CoV-2.primer.bed".format(workspace_path, args.arctic))
else:
sys.exit("Pipeline only supports artic-network primer-scheme (V1,V2,V3,V4,V4.1). Other schemes need to be downloaded by user seperately and named SARS-CoV-2.Version.primer.bed naming")
print("Using {0} primer-scheme".format(args.arctic))
primerbed = "{0}SARS-CoV-2.{1}.primer.bed".format(workspace_path, args.arctic)
#Ceate file that records samples that have already been analyzed for QC
analyzed_list=args.analyzed
os.system("/bin/touch {0}{1}".format(workspace_path, analyzed_list))
#Number of samples being analyzed in parallel (Default 4 samples at a time)
nb_sim_process = int(args.parallel)
print('Running ' + str(nb_sim_process) + ' of samples simultaneously')
#Number of threads to use for multithread tasks (fastp and bwa): Default 4
threads = int(args.threads)
print('Using ' + str(threads) + 'threads per ' + str(nb_sim_process) + ' samples analyzed simultaneously')
#Theoretically the number of CPUs available should be threads * nb_sim_process which is what we can request for QC table script
total_CPU = int(nb_sim_process) * int(threads)
### STARTING ANALYSIS ###
print("File checking complete, running analysis")
#Analyze samples in parrallel (4 by default or whichever number selected by --parallel)
liste_index_sample = range(0, num_samples, nb_sim_process)
for i in liste_index_sample:
# create a list that will record the samples that have already been analyzed
already_analyzed_samples_list=[]
with open("{0}{1}".format(workspace_path, analyzed_list)) as f:
already_analyzed_samples_list = f.readlines()
index_line = 0
for line in already_analyzed_samples_list:
already_analyzed_samples_list[index_line] = line.replace("\n", "")
index_line = index_line + 1
processes = []
if (i + nb_sim_process) < num_samples:
sub_list_index = range(i, (i+nb_sim_process), 1)
for t in sub_list_index:
if (not(lst_samples[t] in already_analyzed_samples_list)):
print("Starting analysis on sample : " + lst_samples[t])
globals()["p"+str(t)] = mp.Process(target=run_core_iPMVC, args=(workspace_path, lst_samples[t], threads,sample_path,usherbarcodes, output, analyzed_list, primerbed))
processes.append(globals()["p"+str(t)])
globals()["p"+str(t)].start()
else:
print(lst_samples[t] + "has already been analyzed, if re-analyzing, remove from " + analyzed_list)
else:
sub_list_index = range(i,num_samples,1)
for t in sub_list_index:
if (not(lst_samples[t] in already_analyzed_samples_list)):
print("Starting analysis on sample : " + lst_samples[t])
globals()["p"+str(t)] = mp.Process(target=run_core_iPMVC, args=(workspace_path, lst_samples[t], threads,sample_path,usherbarcodes, output, analyzed_list, primerbed))
processes.append(globals()["p"+str(t)])
globals()["p"+str(t)].start()
else:
print(lst_samples[t] + "has already been analyzed, if re-analyzing, remove from " + analyzed_list)
if (len(processes) != 0):
for p in processes:
p.join()
for p in processes:
while (p.is_alive()):
time.sleep(5)
#Run file checking
if (args.file_check):
os.system("python3 {0}file_checking.py {0}{1} {2} {3} {4}".format(workspace_path, samples_list_file, output, sample_path, total_CPU))
os.system("Rscript {1}file_check_visualize.R {0}File_Check_Output.txt {0}".format(output, workspace_path))