-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilter_gtf.py
executable file
·142 lines (116 loc) · 3.99 KB
/
filter_gtf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
import collections
import csv
import gzip
import os
import re
import sys
PATTERN = re.compile(r'(\S+?)\s*"(.*?)"')
gtf_row = collections.namedtuple( 'gtf_row', 'seqname source feature start end score strand frame attributes' )
def generic_open(file_name, *args, **kwargs):
if file_name.endswith('.gz'):
file_obj = gzip.open(file_name, *args, **kwargs)
else:
file_obj = open(file_name, *args, **kwargs)
return file_obj
class GtfParser:
def __init__(self, gtf_fn):
self.gtf_fn = gtf_fn
self.gene_id = []
self.gene_name = []
self.id_name = {}
self.id_strand = {}
def get_properties_dict(self, properties_str):
"""
allow no space after semicolon
"""
if isinstance(properties_str, dict):
return properties_str
properties = collections.OrderedDict()
attrs = properties_str.split(';')
for attr in attrs:
if attr:
m = re.search(PATTERN, attr)
if m:
key = m.group(1).strip()
value = m.group(2).strip()
properties[key] = value
return properties
def gtf_reader_iter(self):
"""
Yield:
row: list
gtf_row
"""
with generic_open(self.gtf_fn, mode='rt') as f:
reader = csv.reader(f, delimiter='\t')
for i, row in enumerate(reader, start=1):
if len(row) == 0:
continue
if row[0].startswith('#'):
yield row, None
continue
if len(row) != 9:
sys.exit(f"Invalid number of columns in GTF line {i}: {row}\n")
if row[6] not in ['+', '-']:
sys.exit(f"Invalid strand in GTF line {i}: {row}\n")
seqname = row[0]
source = row[1]
feature = row[2]
# gff/gtf is 1-based, end-inclusive
start = int(row[3])
end = int(row[4])
score = row[5]
strand = row[6]
frame = row[7]
attributes = self.get_properties_dict(row[8])
yield row, gtf_row( seqname, source, feature, \
start, end, score, strand, frame, \
attributes )
def filter_gtf(gtf_fn, out_fn, allow):
"""
Filter attributes
Args:
allow: {
"gene_biotype": set("protein_coding", "lncRNA")
}
"""
sys.stderr.write("Writing GTF file...\n")
gp = GtfParser(gtf_fn)
n_filter = 0
with open(out_fn, 'w') as f:
# quotechar='' is not allowed since python3.11
writer = csv.writer(f, delimiter='\t', quoting=csv.QUOTE_NONE, quotechar=None)
for row, grow in gp.gtf_reader_iter():
if not grow:
writer.writerow(row)
continue
remove = False
if allow:
for key, value in grow.attributes.items():
if key in allow and value not in allow[key]:
remove = True
break
if not remove:
writer.writerow(row)
else:
n_filter += 1
return n_filter
if __name__ == '__main__':
# args: gtf, attributes
gtf_fn = sys.argv[1]
attributes = sys.argv[2]
out_fn = os.path.basename(gtf_fn).replace('.gtf', '.filtered.gtf')
allow = {}
for attr_str in attributes.split(';'):
if attr_str:
attr, val = attr_str.split('=')
val = set(val.split(','))
allow[attr] = val
n_filter = filter_gtf(gtf_fn, out_fn, allow)
sys.stdout.write(f"Filtered {n_filter} lines\n")
log_file = "gtf_filter.log"
with open(log_file, 'w') as f:
f.write(f"Filtered lines: {n_filter}\n")
f.write(f"Attributes: {attributes}\n")
f.write(f"Output file: {out_fn}\n")