-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathab1.py
127 lines (109 loc) · 3.91 KB
/
ab1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from utils import int_to_16_bit
from directory import Directory
from typing import List, Optional
VERSION_NUMBER: int = 101
OFFSET_START: int = 128
CHANNEL_TAG_NUMBER_ORDER: List[int] = [9, 10, 11, 12]
def base_calls_from_channels(
a_channel: List[int],
t_channel: List[int],
c_channel: List[int],
g_channel: List[int],
peak_locations: List[int],
) -> str:
"""
Estimates base-calls from nucleotide channels.
:param a_channel: Adenine (A) channel values
:param t_channel: Thymine (T) channel values
:param c_channel: Cytosine (C) channel values
:param g_channel: Guanine (G) channel values
:param peak_locations: Peak locations
:return: Estimated base-calls
"""
base_calls = []
for pl in peak_locations:
values_and_bases = (
(a_channel[pl], "A"),
(t_channel[pl], "T"),
(c_channel[pl], "C"),
(g_channel[pl], "G"),
)
base_calls.append(max(values_and_bases)[1])
return "".join(base_calls)
def write_ab1(
filename: str,
a_channel: List[int],
t_channel: List[int],
c_channel: List[int],
g_channel: List[int],
peak_locations: List[int],
base_calls: Optional[str] = None,
sample_name: Optional[str] = None,
comment: Optional[str] = None,
base_order: Optional[List[str]] = None,
) -> None:
"""
Writes an .ab1 file, as per the Applied Biosystems 2006 ABIF file spec:
https://projects.nfstc.org/workshops/resources/articles/ABIF_File_Format.pdf
:param filename: Name of .ab1 file
:param a_channel: Adenine (A) channel values
:param t_channel: Thymine (T) channel values
:param c_channel: Cytosine (C) channel values
:param g_channel: Guanine (G) channel values
:param peak_locations: List of peak locations
:param base_calls: String of base-calls
:param sample_name: (OPTIONAL) Name of sample
:param comment: (OPTIONAL) Comment about sample
:param base_order: (OPTIONAL) Order of nucleotide channels
"""
if sample_name is None:
sample_name = "Auto-generated sample."
if comment is None:
comment = "Generated by BioPython ab1 creator."
if base_order is None:
base_order = ["G", "A", "T", "C"]
if base_calls is None:
base_calls = base_calls_from_channels(
a_channel,
t_channel,
c_channel,
g_channel,
peak_locations,
)
dirs = [
Directory.peak_locations(peak_locations),
Directory.base_order(base_order),
Directory.lane(42),
Directory.dye_signal_strength(500, 500, 500, 500),
Directory.sample_name(sample_name),
Directory.base_calls(1, base_calls),
Directory.base_calls(2, base_calls),
Directory.comment(comment),
Directory.mobility_filename(1, "{}.mob".format(filename)),
Directory.mobility_filename(2, "{}.mob".format(filename)),
]
channels = {"A": a_channel, "T": t_channel, "C": c_channel, "G": g_channel}
for tag_num, base in zip(CHANNEL_TAG_NUMBER_ORDER, base_order):
dirs.append(Directory.channel_data(tag_num, channels[base]))
with open("{}.ab1".format(filename), "wb") as ab1:
total_data_size = sum(x.data_size for x in dirs if x.data_size > 4)
# -- Header --
ab1.write("ABIF".encode())
ab1.write(int_to_16_bit(VERSION_NUMBER))
# -- Root directory --
root_dir = Directory.root(num_dirs=len(dirs))
root_dir.write_to(ab1, OFFSET_START + total_data_size)
# -- 47 2-byte integer buffer --
for _ in range(47):
ab1.write(int_to_16_bit(0))
# -- Data --
for d in dirs:
if d.data_size <= 4:
continue
ab1.write(d.data)
# -- Directories --
offset = OFFSET_START
for d in dirs:
d.write_to(ab1, offset)
if d.data_size > 4:
offset += d.data_size