-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtracing_rank.py
128 lines (110 loc) · 4.38 KB
/
tracing_rank.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from .template_rank import AbstractRanker
from .mean_field_rank import contacts_rec_to_csr, get_rank, check_inputs
def csr_to_list(x):
x_coo = x.tocoo()
return zip(x_coo.row, x_coo.col, x_coo.data)
def ranking_tracing(t, transmissions, observations, tau, rng):
"""Naive contact tracing.
Search for all individuals that have been in contact during [t-tau, t]
with the individuals last tested positive (observations s=I at
t-tau <= t_test < t) and count the number of encounters.
Returns: scores = encounters. If t < delta returns random scores.
"""
N = transmissions[0].shape[0]
if (t < tau):
scores = rng.rand(N)
return scores
# last_tested : observations s=I for t-tau <= t_test < t
last_tested = set(
obs["i"] for obs in observations
if obs["s"] == 1 and (t - tau <= obs["t_test"]) and (obs["t_test"] <= t)
)
# contacts with last_tested people during [t - tau, t]
#print("CT, last_tested ", last_tested)
contacts = pd.DataFrame(
dict(i=i, j=j, t=t_contact)
for t_contact in range(t - tau, t)
for i, j, lamb in csr_to_list(transmissions[t_contact])
if j in last_tested and lamb # lamb = 0 does not count
)
encounters = pd.DataFrame({"i": range(N)})
# no encounters -> count = 0
if (contacts.shape[0] == 0):
encounters["count"] = 0
else:
counts = contacts.groupby("i").size() # number of encounters for all i
encounters["count"] = encounters["i"].map(counts).fillna(0)
scores = encounters["count"].values
return scores
def ranking_tracing_faster(t, transmissions, observations, tau, rng):
"""Naive contact tracing.
Search for all individuals that have been in contact during [t-tau, t]
with the individuals last tested positive (observations s=I at
t-tau <= t_test < t) and count the number of encounters.
Faster versions, using the sparse matrices
@author Fabio Mazza
Returns: scores = encounters. If t < delta returns random scores.
"""
N = transmissions[0].shape[0]
if (t < tau):
scores = rng.rand(N)
return scores
# last_tested : observations s=I for t-tau <= t_test < t
last_tested = set(
obs["i"] for obs in observations
if obs["s"] == 1 and (t - tau <= obs["t_test"]) and (obs["t_test"] <= t)
)
nt = len(last_tested)
## Indicator matrix, 1 -> tested positive
test_pos = csr_matrix((np.ones(nt),
(np.zeros(nt,dtype=int), list(last_tested))), shape=(1,N) )
c = 0
for t_c in range(t-tau, t):
## number of contacts with the positive individual at time t_c
x = (test_pos).dot(transmissions[t_c]>0)
c+= x #(x>0).astype(float)
"""
For a more accurate (?) version of the CT, we could also do:
x = (test_pos).dot(transmissions[t_c])
c+= (x>0).astype(float)
"""
scores = c.toarray()[0]
return scores
class TracingRanker(AbstractRanker):
def __init__(self, tau, lamb):
self.description = "class for naive contact tracing of openABM loop."
self.author = "https://github.com/sphinxteam"
self.tau = tau
self.lamb = lamb
self.rng = np.random.RandomState(1)
def init(self, N, T):
self.transmissions = []
self.observations = []
self.T = T
self.N = N
return True
def rank(self, t_day, daily_contacts, daily_obs, data):
'''
computing rank of infected individuals
return: list -- [(index, value), ...]
'''
# check that t=t_day in daily_contacts and t=t_day-1 in daily_obs
#check_inputs(t_day, daily_contacts, daily_obs)
# append daily_contacts and daily_obs
daily_transmissions = contacts_rec_to_csr(self.N, daily_contacts, self.lamb)
self.transmissions.append(daily_transmissions)
self.observations += [
dict(i=i, s=s, t_test=t_test) for i, s, t_test in daily_obs
]
## Trick to run CT always
tau_today = min(self.tau, t_day)
# scores given by mean field run from t-delta to t
scores = ranking_tracing_faster(
t_day, self.transmissions, self.observations, tau_today, self.rng
)
# convert to list [(index, value), ...]
rank = get_rank(scores)
return rank