forked from xenobiolab/xenomorph
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxenosim.py
387 lines (261 loc) · 11 KB
/
xenosim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
########################################################################
########################################################################
"""
xenosim.py
Description: Generates a toy set of levels + noise (std) given a
sequence of kmers as input and specified model. Various user supplied
settings can be specified including what models to use for simulation,
specification of signal noise (kmer-specific or global), sampled data or
simulate data, etc. Script should be modified below and ran using:
python xenosim.py.
Title: Synthesis and Sequencing of a 12-Letter Supernumerary DNA
By: H. Kawabe, C. Thomas, S. Hoshika, Myong-Jung Kim, Myong-Sang Kim, L. Miessner, J. M. Craig,
J. Gundlach, A. Laszlo, S. A. Benner, J. A. Marchand
Updated: 2/14/23
"""
########################################################################
########################################################################
import pandas as pd
import numpy as np
import itertools
import seaborn as sns
import os
import sys
from string import ascii_lowercase
import scipy.stats as stats
import matplotlib.pyplot as plt
import statistics
import warnings
warnings.filterwarnings("ignore")
import multiprocessing
import random
from alive_progress import alive_bar
def kmer2level(kmers, kmer_model,n_iter):
level=[]
for i in range(0,len(kmers)):
kmer_select = kmers[i]
#Fetch mean from kmer model
kmer_s = kmer_model[kmer_model['KXmer']==kmer_select]
mu = float(kmer_s['mu'].iloc[0]) #Fetch mean
#Fetch standard deivation from kmer model
noise = float(kmer_s['sigma'].iloc[0])
#Generate error
st = np.random.normal(0,noise, n_iter)
#Simulate an observation
mus = mu + st
#Store observed level in array
level.append(mus)
#Return simulated levels
return(level)
#Converts a sequence seq into kmers of length k
def seq2kmer(seq, k):
kmers = []
for i in range(0,len(seq)-k+1):
kmers.append(seq[i:i+k].upper())
return kmers
def kmer2seq(kmers):
seq=kmers[0]
for i in range(1, len(kmers)):
seq=seq+kmers[i][-1]
return(seq)
def getKernelDensityEstimation(values, kde_step_size, kernel):
x = np.arange(-5,5,kde_step_size)
level=values
iqr = scipy.stats.iqr(level)
a_factor = 0.9*np.min([iqr/1.34,np.std(level)])
bandwidth = a_factor*len(level)**(-1/5)
if bandwidth == 0:
bandwidth = 0.2
model = KernelDensity(kernel = kernel, bandwidth=bandwidth)
model.fit(np.array(values).reshape((-1,1)))
logprob= model.score_samples(np.array(x).reshape((-1,1)))
clustering = MeanShift(bandwidth=bandwidth).fit(np.array(values).reshape((-1,1)))
#Get maxima
ma = argrelextrema(logprob, np.greater)[0]
#Get min of max
logma = list(logprob[ma])
logma.sort()
try:
minin = list(logprob[ma]).index(logma[-rank_density])
except:
minin = list(logprob[ma]).index(logma[-1])
kernel_mean = x[ma][minin]
return kernel_mean
def gen_model(model_bases, model_fn):
print('Xenomorph Status - [Morph] Compiling model from input model base abbreviations.')
#Parse model, get paths of each model file component
mf = parse_model_files(model_bases,True)
#Compile model from file paths into a master kmer model
km = compile_model(mf, model_bases)
#Save model
km.to_csv(model_fn, index=False)
#Samples levels from a raw_kmer_file. Can be used to simulate data instead of kmer2level
#Return row_wize set of level simulations
#Sampling is optional
def sample2levels(kmers, raw_kmers, n_iter):
level_sets =[]
for i in range(0,len(kmers)):
kmer_select = kmers[i]
#Fetch mean from kmer model
kmer_s = raw_kmers[raw_kmers['kmer_xy']==kmer_select]
lev= kmer_s.iloc[0]['mean_level'].replace('\n',' ').replace("'",'').replace(']','').replace('[','').split(' ')
level=[]
for i in range(0,len(lev)):
try:
level.append(float(lev[i]))
except:
ex = 0
level_sets.append(random.choices(level, k = n_iter))
return level_sets
#Input a Kmer string, normalized level, and a kmer_distribution CSV file. Outputs probability of observing event (P e|x)
def kmerpdf(kmer_select,event,kmer_model):
#Generate pdf of kmer levels with mean mu and standard deviation std
kmer_s = kmer_model[kmer_model['KXmer']==kmer_select]
mu =kmer_s['mu'].iloc[0]
st = kmer_s['sigma'].iloc[0]
return stats.norm(float(mu),float(st)).pdf(event)
#Input a path of kmers to take, and output the log probability of that path
def chart_path(kmer_path, levels, kmer_model):
p = 0
for i in range(0,len(kmer_path)):
kmer_select = kmer_path[i]
event = levels[i]
pj = np.log10(kmerpdf(kmer_select,event,kmer_model))
p = p + pj
return (p)
#Strickly uses 7th base position. Given a sequence and list of levels, will calculate all alternative likelyhoods
def gen_alt_all(sequence, kmer_levels, kmer_model, all_bases,n_iter, xbase_pos,kmer_size):
#Store np array of log likelihoods
lpp=[]
for j in range(0,len(all_bases)):
#Generate alternative sequence - Uses fixed 7mer index
seq_alt = sequence[0:xbase_pos]+all_bases[j]+sequence[xbase_pos+1:8]
#Convert sequence to kmer list, then calculate liklihoods
lp = chart_path(seq2kmer(seq_alt,kmer_size), kmer_levels, kmer_model)
if all_bases[j] in xna_bases:
lp = lp+rocd
#store likelihood output in lpp array
lpp.append(lp)
#Generate bool table of best for each simulation
ppl_max = np.multiply(lpp == np.amax(lpp, axis=0),1)
#Calculate %basecall for each alternative base - column follows all_bases order
if analysis_mode != 'global':
ppl_max = np.sum(ppl_max,axis=1)/n_iter
#Return %accuracy for each base
return ppl_max
############################
#Load kmer measurements
############################
xna_bases = ['A','T','G','C','B','S','P','Z','X','K','J','V']
all_bases = ['A','T','G','C','B','S','P','Z','X','K','J','V']
model_bases = 'ATGCBSPZXKJV'
#Analysis mode
analysis_mode = 'per-read' #per-read or global
#Raw kmer file input (for sampling-based generation)
raw_kmer_file = ''
#Output file name to save
out_fn = ''
#Nmer model to simulate (nnnxnnn, nxnn, nnxnn etc)
nmer_model = 'nnnxnnn'
#Kmer models to use (e.g. [4, 5, 6] will use 4kmer, 5kmer, and 6kmer models in ML calculations
kmer_sizes = [4]
#Set number of iterations to run in simulation
n_iter=1000
#Kmer levels
kmer_levels = 'Simulate' #Sample (from real data) or Simulate kmer levels from PDF
#Set mu when simulating reads as: KDE Mean level, Mean level, or Median level
mu = 'KDE Mean level'
#Set sigma as 'Global' for global median (recommended), 'Kmer' for kmer-specific, or set as float for fixed
sigma = 0.4
#Set mu for basecalling model as: KDE Mean level, Mean level, or Median level
mu_sim = 'KDE Mean level'
#Configure simulated levels model (if kmer_levels = 'Simulate'). Kmer sigma more realistic for simulated reads.
sigma_sim = 0.4
#Load kmer model based on all_bases
model_fn='active_kmer_model_sim.csv'
gen_model(''.join(all_bases), model_fn)
kmer_model_input_file = (model_fn )
kmer_model_input = pd.read_csv(kmer_model_input_file, sep=',')
kmer_model_input_sim = pd.read_csv(kmer_model_input_file, sep=',')
#bases to allow as trimers before and after NNNXNNN
nnn_bases = ['A', 'T', 'G', 'C']
#KDE step size
kde_step_size = 0.01
#One solution is you have an XNA sequence then convert to regular, then sample reg kmers from that
##############################################
#Configure kmer model and kmer simulation model
##############################################
#Model configuration
sig = kmer_model_input['Std level']
if isinstance(sigma, str):
if sigma == 'Global':
sig = np.median(sig)
else:
sig = sigma
kmer_model_input['Std level'] = sig
kmer_model=kmer_model_input[['KXmer',mu,'Std level']]
kmer_model.rename(columns = {mu:'mu', 'Std level':'sigma'}, inplace = True)
kmer_list = kmer_model['KXmer']
if kmer_levels =='Simulate':
#Simulation model configuation
sig_sim = kmer_model_input_sim['Std level']
if isinstance(sigma_sim, str):
if sigma_sim == 'Global':
sig_sim = np.mean(kmer_model_input['Std level'])
else:
sig_sim = sigma_sim
kmer_model_input_sim['Std level'] = sig_sim
kmer_model_sim=kmer_model_input_sim[['KXmer',mu,'Std level']]
kmer_model_sim.rename(columns = {mu:'mu', 'Std level':'sigma'}, inplace = True)
#Load raw kmer file - if sampling levels from raw kmers
if kmer_levels == 'Sample':
print('[Sampling] - Loading Xenosim input model and kmer files.')
#raw_kmer_file = 'xenomorph_pub/PZ_libv2_FLG001GC_kmers.csv'
#raw_kmer_file = 'xenomorph_pub/ATGC_blunt_libv2_FLG001GC_kmers.csv'
raw_kmer_samples = pd.read_csv(raw_kmer_file, sep=',', dtype={'kmer_xy': str, 'mean_level': object}).dropna(subset = ['mean_level'])
##############################################
#Generate kmer data sampling from real dataset
##############################################
#Create empty dataframe to store results
simu_out = pd.DataFrame(columns = ['Sequence', 'NNN1', 'XNA', 'NNN2', 'Iterations'])
simu_out[all_bases]=[]
#Generate every possible nnn-mer
xbase_pos = nmer_model.find('x')
n1 = nnn = list(set([''.join(i) for i in itertools.product(nnn_bases, repeat = xbase_pos)]))
n1.sort()
n2 = nnn = list(set([''.join(i) for i in itertools.product(nnn_bases, repeat = len(nmer_model)-xbase_pos-1)]))
n2.sort()
nnnxnnn = list(itertools.product(n1,xna_bases,n2))
tp =[]
fp =[]
print('Running basecall simulations on '+nmer_model)
with alive_bar(len(nnnxnnn), force_tty=True) as bar:
for i in range(0,len(nnnxnnn)):
sequence = nnnxnnn[i][0]+nnnxnnn[i][1]+nnnxnnn[i][2]
#break up sequence into kmer
kmer=[]
for k in range(0,len(kmer_sizes)):
kmer = kmer+seq2kmer(sequence,kmer_sizes[k])
# print(kmer)
#Generate kmer levels
if kmer_levels =='Sample':
levels = sample2levels(kmer, raw_kmer_samples, n_iter)
if kmer_levels =='Simulate':
levels = kmer2level(kmer, kmer_model_sim, n_iter)
if analysis_mode == 'global':
k_levels = []
for ii in range(0,len(levels)):
kde_mean = getKernelDensityEstimation(levels[ii], kde_step_size,'gaussian')
k_levels.append(kde_mean)
#Calculate log liklihoods
levels = k_levels
lpp = gen_alt_all(sequence,levels, kmer_model, all_bases, n_iter,xbase_pos,kmer_sizes[0])
#For saving into output
nnn1=nnnxnnn[i][0]
x=nnnxnnn[i][1]
nnn2=nnnxnnn[i][2]
simu = np.concatenate([[sequence, nnn1, x, nnn2, n_iter],lpp])
simu_out.loc[len(simu_out.index)]=simu
bar()
#Save output
simu_out.to_csv(out_fn)