Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate detection history into draws vs. logic #41

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 75 additions & 51 deletions ringvax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def get_person_property(self, id: str, property: str) -> Any:
raise RuntimeError(f"Property '{property}' not in schema")

if id not in self.infections:
raise RuntimeError(f"No person with {id=}")
raise RuntimeError(
f"No person with {id=}; cannot get property '{property}'"
)
elif property not in self.infections[id]:
raise RuntimeError(f"Person {id=} does not have property '{property}'")

Expand Down Expand Up @@ -156,29 +158,28 @@ def generate_infection(
detection_history = self.generate_detection_history(id)
self.update_person(id, detection_history)

t_start_infectious = disease_history["t_infectious"]
if detection_history["detected"]:
t_end_infectious = detection_history["t_detected"]
else:
t_end_infectious = disease_history["t_recovered"]
t_end_infectious = (
detection_history["t_detected"]
if detection_history["detected"]
else disease_history["t_recovered"]
)

# when do they infect people?
infection_rate = self.generate_infection_rate()

if disease_history["t_infectious"] > t_end_infectious:
infection_times = np.array([])
else:
infection_times = (
t_start_infectious
+ self.generate_infection_waiting_times(
self.rng,
rate=infection_rate,
infectious_duration=(
t_end_infectious - disease_history["t_infectious"]
),
)
infection_times = disease_history[
"t_infectious"
] + self.generate_infection_waiting_times(
self.rng,
rate=infection_rate,
infectious_duration=(
t_end_infectious - disease_history["t_infectious"]
),
)
assert (infection_times >= t_start_infectious).all()
assert (infection_times >= disease_history["t_infectious"]).all()
assert (infection_times <= t_end_infectious).all()

self.update_person(id, {"infection_times": infection_times})
Expand All @@ -201,48 +202,71 @@ def generate_disease_history(self, t_exposed: float) -> dict[str, Any]:

def generate_detection_history(self, id: str) -> dict[str, Any]:
"""Determine if a person is infected, and when"""
# determine properties of the infector
infector = self.get_person_property(id, "infector")

detected = False
detect_method = None
t_detected = None

passive_detected = self.bernoulli(self.params["p_passive_detect"])
if passive_detected:
detected = True
detect_method = "passive"
t_detected = (
self.get_person_property(id, "t_exposed")
+ self.generate_passive_detection_delay()
)
infector_detected = infector is not None and self.get_person_property(
infector, "detected"
)

active_detected = (
infector is not None
and self.get_person_property(infector, "detected")
and self.bernoulli(self.params["p_active_detect"])
t_infector_detected = (
self.get_person_property(infector, "t_detected")
if infector_detected
else None
)

if active_detected:
t_active_detected = (
self.get_person_property(infector, "t_detected")
+ self.generate_active_detection_delay()
)
if not detected or t_active_detected < t_detected:
detected = True
detect_method = "active"
t_detected = t_active_detected
# determine what kinds of detection this individual is eligible for
potentially_passive_detected = self.bernoulli(self.params["p_passive_detect"])

t_recovered = self.get_person_property(id, "t_recovered")
if detected and t_detected >= t_recovered:
detected = False
detect_method = None
t_detected = None
potentially_active_detected = infector_detected and self.bernoulli(
self.params["p_active_detect"]
)

return {
"detected": detected,
"detect_method": detect_method,
"t_detected": t_detected,
}
# actually determine what kind of detection occurred, if any
return self.resolve_detection_history(
potentially_passive_detected=potentially_passive_detected,
potentially_active_detected=potentially_active_detected,
passive_detection_delay=self.generate_passive_detection_delay(),
active_detection_delay=self.generate_active_detection_delay(),
t_exposed=self.get_person_property(id, "t_exposed"),
t_recovered=self.get_person_property(id, "t_recovered"),
t_infector_detected=t_infector_detected,
)

@staticmethod
def resolve_detection_history(
potentially_passive_detected: bool,
potentially_active_detected: bool,
passive_detection_delay: float,
active_detection_delay: float,
t_exposed: float,
t_recovered: float,
t_infector_detected: Optional[float],
) -> dict[str, Any]:
# a "detection" is a tuple (time, method)
detections = []

# keep track of passive and active possibilities
if potentially_passive_detected:
detections.append((t_exposed + passive_detection_delay, "passive"))

if potentially_active_detected:
assert t_infector_detected is not None
detections.append((t_infector_detected + active_detection_delay, "active"))

# detection only actually happens if it's before recovery
detections = [x for x in detections if x[0] < t_recovered]

if len(detections) == 0:
return {"detected": False, "t_detected": None, "detect_method": None}
else:
# choose the earliest detection
detection = min(detections, key=lambda x: x[0])
return {
"detected": True,
"t_detected": detection[0],
"detect_method": detection[1],
}

def generate_latent_duration(self) -> float:
return self.params["latent_duration"]
Expand Down
122 changes: 110 additions & 12 deletions tests/test_simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@ def rng():
def test_infection_delays_zero_rate(rng):
"""If zero rate, zero infections"""
assert (
list(
ringvax.Simulation.generate_infection_waiting_times(
rng, rate=0.0, infectious_duration=100.0
)
)
== []
ringvax.Simulation.generate_infection_waiting_times(
rng, rate=0.0, infectious_duration=100.0
).size
== 0
)


def test_infection_delays_zero_duration(rng):
"""If zero duration, zero infections"""
assert (
list(
ringvax.Simulation.generate_infection_waiting_times(
rng, rate=100.0, infectious_duration=0.0
)
)
== []
ringvax.Simulation.generate_infection_waiting_times(
rng, rate=100.0, infectious_duration=0.0
).size
== 0
)


Expand Down Expand Up @@ -161,3 +157,105 @@ def test_snapshot(rng):
snapshot = json.load(f)

assert s.infections == snapshot


class TestResolveDetectionHistory:
@pytest.fixture
@staticmethod
def kwargs():
"""Baseline keyword arguments for resolve_detection_history tests"""
return {
"potentially_passive_detected": False,
"potentially_active_detected": False,
"passive_detection_delay": 5.0,
"active_detection_delay": 2.0,
"t_exposed": 0.0,
"t_recovered": 10.0,
"t_infector_detected": None,
}

@staticmethod
def f(kwargs: dict):
return ringvax.Simulation.resolve_detection_history(**kwargs)

def test_baseline(self, kwargs):
"""No potential detections"""
assert self.f(kwargs) == {
"detected": False,
"t_detected": None,
"detect_method": None,
}

def test_passive_only(self, kwargs):
"""Passive detection only"""
kwargs["potentially_passive_detected"] = True
assert self.f(kwargs) == {
"detected": True,
"t_detected": 0.0 + 5.0,
"detect_method": "passive",
}

def test_passive_bad_time(self, kwargs):
"""Passive detection after recovery"""
kwargs["potentially_passive_detected"] = True
kwargs["passive_detection_delay"] = 11.0
assert self.f(kwargs) == {
"detected": False,
"t_detected": None,
"detect_method": None,
}

def test_active_only(self, kwargs):
"""Active detection only"""
kwargs["potentially_active_detected"] = True
kwargs["t_infector_detected"] = 0.0
assert self.f(kwargs) == {
"detected": True,
"t_detected": 0.0 + 2.0,
"detect_method": "active",
}

def test_active_bad_time(self, kwargs):
"""Active detection after recovery"""
kwargs["potentially_active_detected"] = True
kwargs["t_infector_detected"] = 5.0
kwargs["active_detection_delay"] = 6.0
assert self.f(kwargs) == {
"detected": False,
"t_detected": None,
"detect_method": None,
}

def test_both_passive_wins(self, kwargs):
"""Both passive and active detection, passive wins"""
kwargs["potentially_passive_detected"] = True
kwargs["potentially_active_detected"] = True
kwargs["t_infector_detected"] = 5.0
assert self.f(kwargs) == {
"detected": True,
"t_detected": 0.0 + 5.0,
"detect_method": "passive",
}

def test_both_active_wins(self, kwargs):
"""Both passive and active detection, active wins"""
kwargs["potentially_passive_detected"] = True
kwargs["potentially_active_detected"] = True
kwargs["t_infector_detected"] = 1.0
assert self.f(kwargs) == {
"detected": True,
"t_detected": 1.0 + 2.0,
"detect_method": "active",
}

def test_both_neither(self, kwargs):
"""Both passive and active detection, neither wins"""
kwargs["potentially_passive_detected"] = True
kwargs["potentially_active_detected"] = True
kwargs["t_infector_detected"] = 9.0
kwargs["passive_detection_delay"] = 11.0
assert self.f(kwargs) == {
"detected": False,
"t_detected": None,
"detect_method": None,
}
Loading