-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloci_extract.py
280 lines (246 loc) · 10.2 KB
/
loci_extract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#!/usr/bin/env python3
##########################
# Author: B. Anderson
# Date: Sep 2021
# Modified: Nov 2021 (added a sample exclude option); Dec 2023 (increased summary and added consensus);
# Dec 2023 (extended to Stacks allelic output)
# Description: extract loci from an ipyrad .loci file (or Stacks populations.samples.fa) based on an input list
# Note: this script will convert the allelic data from Stacks (if run that way) to a single consensus per sample
##########################
import sys
import argparse
import statistics
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
# create an ambiguity dictionary
amb_dict = {
'AC': 'M',
'AG': 'R',
'AT': 'W',
'CG': 'S',
'CT': 'Y',
'GT': 'K',
'ACG': 'V',
'ACT': 'H',
'AGT': 'D',
'CGT': 'B',
'ACGT': 'N'
}
# create a function for computing the consensus sequence of a list of aligned strings
def make_consensus(seq_list, locus_num):
con_list = []
for position in range(len(seq_list[0])):
align_slice = [sequence[position] for sequence in seq_list]
nbases = [nbase for nbase in align_slice if nbase in ['-', 'N']]
bases = [base for base in align_slice if base not in ['-', 'N']]
base_list = []
if len(bases) != 0: # a residue is there
if bases.count('A') > 0:
base_list.append('A')
if bases.count('C') > 0:
base_list.append('C')
if bases.count('G') > 0:
base_list.append('G')
if bases.count('T') > 0:
base_list.append('T')
if any([len(base_list) == 0, len(nbases) > len(bases)]): # no bases above threshold or more missing data than data
base = 'N'
elif len(base_list) == 1:
base = base_list[0]
else:
base = amb_dict[''.join(base_list)]
con_list.append(base)
else:
con_list.append('N')
return(SeqRecord(Seq(''.join(con_list)), id = str(locus_num), name = str(locus_num), description = str(locus_num)))
# instantiate the parser
parser = argparse.ArgumentParser(description = 'A script to extract loci from an ipyrad .loci file using an input list of locus numbers. ' +
'The loci will be saved as individual fasta files in the current directory with prefix "locus_". ' +
'Now also works for Stacks input (use arg --stacks)')
# add arguments to parse
parser.add_argument('loci_file', type = str, help = 'The .loci file to extract loci from')
parser.add_argument('-c', action = 'store_true', help = 'Flag for whether to output a file with consensus sequences of the loci (optional)')
parser.add_argument('-l', type = str, dest = 'loci_list', help = 'A text file with a list of desired loci numbers, one per line (optional). ' +
'If not provided, will extract all loci')
parser.add_argument('-s', type = str, dest = 'samp_list', help = 'A text file with a list of sample names to exclude, one per line (optional). ' +
'If not provided, will include all samples')
parser.add_argument('--stacks', action = 'store_true', help = 'Flag for whether the input file is from Stacks (optional) [default: ipyrad]')
# parse the command line
if len(sys.argv[1:]) == 0: # if there are no arguments
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
consens = args.c
loci_file = args.loci_file
loci_list = args.loci_list
samp_list = args.samp_list
stacks = args.stacks
# read in the list of loci (if present) and assign to a list
if loci_list:
locus_nums = []
with open(loci_list, 'r') as loci:
for locus in loci:
locus_nums.append(locus.rstrip())
print('Will attempt to extract ' + str(len(locus_nums)) + ' loci')
# read in the list of samples (if present) and assign to a list
if samp_list:
samp_names = []
with open(samp_list, 'r') as samples:
for sample in samples:
samp_names.append(sample.rstrip())
print('Will exclude ' + str(len(samp_names)) + ' samples from extracted loci')
# check for consensus and create a list to hold them
if consens:
consensus_list = []
print('Will generate a consensus sequence for each locus')
# extract the loci from the .loci file
with open(loci_file, 'r') as locfile:
counter = 1
found_loci = 0
processed_loci = 0
empty_loci = 0
locus_lengths = []
###### ipyrad
if not stacks: # the ipyrad .loci file has special formatting
this_locus = []
for line in locfile:
if line.startswith('//'): # this indicates the final line of a locus
processed_loci = processed_loci + 1
if processed_loci > 100 * counter:
print('Processed ' + str(100 * counter) + ' loci')
counter = counter + 1
locus_num = line.rstrip().split('|')[1]
if loci_list:
if locus_num in locus_nums:
found_loci = found_loci + 1
if len(this_locus) > 0:
with open('locus_' + str(locus_num) + '.fasta', 'w') as outfile:
for entry in this_locus:
outfile.write('>' + entry.rstrip().split()[0] + '\n' + entry.rstrip().split()[1] + '\n')
locus_lengths.append(len(this_locus[0].rstrip().split()[1]))
if consens:
text_seqs = [entry.rstrip().split()[1] for entry in this_locus]
conseq = make_consensus(text_seqs, locus_num)
consensus_list.append(conseq)
else:
empty_loci = empty_loci + 1
else:
found_loci = found_loci + 1
if len(this_locus) > 0:
with open('locus_' + str(locus_num) + '.fasta', 'w') as outfile:
for entry in this_locus:
outfile.write('>' + entry.rstrip().split()[0] + '\n' + entry.rstrip().split()[1] + '\n')
locus_lengths.append(len(this_locus[0].rstrip().split()[1]))
if consens:
text_seqs = [entry.rstrip().split()[1] for entry in this_locus]
conseq = make_consensus(text_seqs, locus_num)
consensus_list.append(conseq)
else:
empty_loci = empty_loci + 1
this_locus = []
else:
if samp_list:
if line.split()[0] not in samp_names:
this_locus.append(line)
else:
this_locus.append(line)
###### Stacks
else: # the Stacks file is already fasta (note: two seqs per sample = alleles)
locus_count_list = []
seq_list = []
for line in locfile:
if line.startswith('#'):
continue
elif line.startswith('>'):
parts = line.strip().split() # format: ">CLocus_1_Sample_100_Locus_1_Allele_0 [sampleID]"
locus_num = parts[0].split('_')[1]
sample = parts[1].strip('[').strip(']')
else:
if locus_num not in locus_count_list:
locus_count_list.append(locus_num)
processed_loci = processed_loci + 1
if processed_loci > 100 * counter:
print('Processed ' + str(100 * counter) + ' loci')
counter = counter + 1
# process the stored seq_list to ensure each sample is represented by only one sequence
# generate loci output and store consensus if requested
if len(seq_list) > 0:
found_loci = found_loci + 1
seq_out_list = []
sample_list = sorted(list(set([item[0] for item in seq_list])))
for samp in sample_list:
seqs = [item[1] for item in seq_list if item[0] == samp]
if len(seqs) == 1: # only represented by one sequence
print('Sample ' + str(samp) + ' has one allele for locus ' + str(this_locus_num))
sample_seq = SeqRecord(Seq(seqs[0]), id = samp, name = samp, description = samp)
seq_out_list.append(sample_seq)
elif len(seqs) > 2: # shouldn't happen
print('Sample ' + str(samp) + ' has more than two alleles for locus ' + str(this_locus_num))
else: # normal two alleles
sample_seq = make_consensus(seqs, samp)
seq_out_list.append(sample_seq)
with open('locus_' + str(this_locus_num) + '.fasta', 'w') as outfile:
for entry in seq_out_list:
SeqIO.write(entry, outfile, 'fasta')
locus_lengths.append(len(seq_out_list[0].seq))
if consens:
text_seqs = [str(entry.seq) for entry in seq_out_list]
conseq = make_consensus(text_seqs, this_locus_num)
consensus_list.append(conseq)
# reset the locus_num and seq_list
this_locus_num = locus_num
seq_list = []
# store the line (sequence)
if loci_list:
if locus_num in locus_nums: # a target locus
if samp_list:
if sample not in samp_names: # a desirable sample
seq_list.append([sample, line.strip()])
else:
seq_list.append([sample, line.strip()])
elif samp_list:
if sample not in samp_names:
seq_list.append([sample, line.strip()])
else:
seq_list.append([sample, line.strip()])
# run once more for last locus
if len(seq_list) > 0:
found_loci = found_loci + 1
seq_out_list = []
sample_list = sorted(list(set([item[0] for item in seq_list])))
for samp in sample_list:
seqs = [item[1] for item in seq_list if item[0] == samp]
if len(seqs) == 1: # only represented by one sequence
print('Sample ' + str(samp) + ' has one allele for locus ' + str(this_locus_num))
sample_seq = SeqRecord(Seq(seqs[0]), id = samp, name = samp, description = samp)
seq_out_list.append(sample_seq)
elif len(seqs) > 2: # shouldn't happen
print('Sample ' + str(samp) + ' has more than two alleles for locus ' + str(this_locus_num))
else: # normal two alleles
sample_seq = make_consensus(seqs, samp)
seq_out_list.append(sample_seq)
with open('locus_' + str(this_locus_num) + '.fasta', 'w') as outfile:
for entry in seq_out_list:
SeqIO.write(entry, outfile, 'fasta')
locus_lengths.append(len(seq_out_list[0].seq))
if consens:
text_seqs = [str(entry.seq) for entry in seq_out_list]
conseq = make_consensus(text_seqs, this_locus_num)
consensus_list.append(conseq)
# output the consensus if requested
if consens:
with open('loci_consensus.fasta', 'w') as outfile:
for conseq in consensus_list:
SeqIO.write(conseq, outfile, 'fasta')
# summarize the results
if loci_list:
print('Processed ' + str(processed_loci) + ' loci and extracted ' + str(found_loci) + ' loci of the targeted ' + str(len(locus_nums)))
else:
print('Processed ' + str(processed_loci) + ' loci and extracted ' + str(found_loci) + ' loci')
if empty_loci > 0:
print('Did not export ' + str(empty_loci) + ' loci that only included excluded samples')
mean = sum(locus_lengths)/len(locus_lengths)
stdev = statistics.pstdev(locus_lengths)
print('For ' + str(len(locus_lengths)) + ' retained loci, the mean length was ' + str('{:.2f}'.format(mean)) +
' bp (min '+ str(min(locus_lengths)) + ', max ' + str(max(locus_lengths)) + ', stdev ' + str('{:.2f}'.format(stdev)) + ')')