-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patho-smart-trim
executable file
·144 lines (112 loc) · 5.63 KB
/
o-smart-trim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2010 - 2014, A. Murat Eren
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# Please read the COPYING file.
import sys
import argparse
import Oligotyping.lib.fastalib as u
from Oligotyping.utils.utils import Run, Progress
progress = Progress()
run = Run()
def smart_trim(fasta_file_path, min_percent = 95.0, output_file_path = None, from_start = True):
fasta = u.SequenceSource(fasta_file_path)
run.info('Input File', fasta_file_path)
progress.new('Sanity check')
next(fasta)
alignment_length = len(fasta.seq)
while next(fasta):
if fasta.pos % 100 == 0:
progress.update(fasta.pos)
if len(fasta.seq) != alignment_length:
progress.end()
run.info_single('Error: Not all reads are equal in length! Are you sure this is an alignment?')
sys.exit()
num_reads = fasta.pos
fasta.reset()
progress.end()
run.info('Number of reads', num_reads)
run.info('Number of characters in the alignment', alignment_length)
positions = dict([(i, 0) for i in range(0, alignment_length)])
progress.new('First pass')
while next(fasta):
if not from_start:
fasta.seq = fasta.seq[::-1]
if fasta.pos % 100 == 0:
progress.update(fasta.pos)
for i in range(0, alignment_length):
if fasta.seq[i] != '-':
for j in range(i, alignment_length):
positions[j] += 1
break
progress.end()
trim_location_has_been_set = False
I = lambda: i if from_start else (alignment_length - i)
progress.new('Setting trimming location')
for i in range(0, alignment_length):
progress.update('.' * (i + 1))
pct_reads_will_survive = positions[i] * 100.0 / num_reads
if pct_reads_will_survive >= min_percent and not trim_location_has_been_set:
trim_location = I()
trim_location_has_been_set = True
trim_location_pct_reads_survive = pct_reads_will_survive
if pct_reads_will_survive == 100:
progress.end()
message = "All reads are going to be trimmed from the %dth position. " % trim_location
if 100 - trim_location_pct_reads_survive:
message += '%d reads that do not reach to this position will be eliminated. ' % ((100 - trim_location_pct_reads_survive) / 100.0 * num_reads)
if min_percent < 100:
gain = I() - trim_location
if gain < 0:
gain *= -1
message += 'If all reads were to be retained, alignments should have been trimmed from\
the %dth location, however, this would have required all reads to lose %d\
more characters.' % (I(), gain)
run.warning(message)
break
progress.end()
output_file_path = output_file_path if output_file_path else sys.argv[1] + '-TRIMMED'
output = u.FastaOutput(output_file_path)
fasta.reset()
progress.new('Storing trimmed reads')
while next(fasta):
if fasta.pos % 100 == 0:
progress.update('%d' % fasta.pos)
trimmed_sequence = fasta.seq[trim_location:] if from_start else fasta.seq[:trim_location]
if from_start and trimmed_sequence.startswith('-'):
continue
if not from_start and trimmed_sequence.endswith('-'):
continue
output.write_id(fasta.id)
output.write_seq(trimmed_sequence, split = False)
progress.end()
run.info('Num of characters in the trimmed alignment', len(trimmed_sequence))
run.info('Output file', output_file_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Smart trim ragged ends of an alignment')
parser.add_argument('fasta_file', metavar = 'FASTA FILE',
help = 'Alignment to be trimmed')
parser.add_argument('--min-percent', metavar = 'PERCENT', default=95.0, type=float,
help = 'Even if there is only one read that is too short and therefore full of gap characters,\
the first location in the alignment file that *every* read has a base would have to match\
the length of that short read. With this percentage you can specify what is the percentage\
of reads you expect to pass while this trimming script tries to maximize the remaining\
read length after trimming. Default is %(default).2f')
parser.add_argument('-E', '--from-end', action="store_true", default = False,
help = 'Trim from the end of the file')
parser.add_argument('-S', '--from-start', action="store_true", default = False,
help = 'Trim from the beginning of the file')
parser.add_argument('-o', '--output', help = 'Output file name', default = None)
args = parser.parse_args()
if args.from_start and args.from_end:
print("Error: You have to use either --from-start flag, or --from-end flag for each run. Sorry!")
sys.exit()
if (not args.from_start) and (not args.from_end):
print("Error: You must choose the appropriate flag to declare from where the trimming should start (--from-start or --from-end).")
sys.exit()
smart_trim(args.fasta_file, args.min_percent, args.output, from_start = args.from_start)