Skip to content

Commit

Permalink
Function for replacing objectivePrior* by observables/measurements (#…
Browse files Browse the repository at this point in the history
…309)

Reformulate the given PEtab problem such that the objective priors are converted to measurements. This is done by adding a new observable for each prior and adding a corresponding measurement to the measurement table. The resulting optimization problem will be equivalent to the original problem. This is meant to be used for tools that do not support priors.

Closes #307
---------

Co-authored-by: Dilan Pathirana <[email protected]>
  • Loading branch information
dweindl and dilpath authored Oct 15, 2024
1 parent fdd9d95 commit cf82b88
Show file tree
Hide file tree
Showing 3 changed files with 341 additions and 1 deletion.
2 changes: 1 addition & 1 deletion petab/v1/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def get_priors_from_df(
# get types and parameters of priors from dataframe
par_to_estimate = parameter_df.loc[parameter_df[ESTIMATE] == 1]

if parameter_ids:
if parameter_ids is not None:
try:
par_to_estimate = par_to_estimate.loc[parameter_ids, :]
except KeyError as e:
Expand Down
203 changes: 203 additions & 0 deletions petab/v1/priors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Functions related to prior handling."""
import copy

import numpy as np
import pandas as pd

from ..v2.C import PREEQUILIBRATION_CONDITION_ID
from . import (
ESTIMATE,
LAPLACE,
LIN,
LOG,
LOG10,
LOG_LAPLACE,
LOG_NORMAL,
MEASUREMENT,
NOISE_DISTRIBUTION,
NOISE_FORMULA,
NOISE_PARAMETERS,
NORMAL,
OBJECTIVE_PRIOR_PARAMETERS,
OBJECTIVE_PRIOR_TYPE,
OBSERVABLE_FORMULA,
OBSERVABLE_ID,
OBSERVABLE_TRANSFORMATION,
PARAMETER_SCALE,
PARAMETER_SCALE_LAPLACE,
PARAMETER_SCALE_NORMAL,
PARAMETER_SEPARATOR,
SIMULATION_CONDITION_ID,
TIME,
Problem,
)


def priors_to_measurements(problem: Problem):
"""Convert priors to measurements.
Reformulate the given problem such that the objective priors are converted
to measurements. This is done by adding a new observable
``prior_{parameter_id}`` for each estimated parameter that has an objective
prior, and adding a corresponding measurement to the measurement table.
The new measurement is the prior distribution itself. The resulting
optimization problem will be equivalent to the original problem.
This is meant to be used for tools that do not support priors.
The conversion involves the probability density function (PDF) of the
prior, the parameters (e.g., location and scale) of that prior PDF, and the
scale and value of the estimated parameter. Currently, `uniform` priors are
not supported by this method. This method creates observables with:
- `observableFormula`: the parameter value on the `parameterScale`
- `observableTransformation`: `log` for `logNormal`/`logLaplace`
distributions, `lin` otherwise
and measurements with:
- `measurement`: the PDF location
- `noiseFormula`: the PDF scale
Arguments
---------
problem:
The problem to be converted.
Returns
-------
The new problem with the priors converted to measurements.
"""
new_problem = copy.deepcopy(problem)

# we only need to consider parameters that are estimated
par_df_tmp = problem.parameter_df.loc[problem.parameter_df[ESTIMATE] == 1]

if (
OBJECTIVE_PRIOR_TYPE not in par_df_tmp
or par_df_tmp.get(OBJECTIVE_PRIOR_TYPE).isna().all()
or OBJECTIVE_PRIOR_PARAMETERS not in par_df_tmp
or par_df_tmp.get(OBJECTIVE_PRIOR_PARAMETERS).isna().all()
):
# nothing to do
return new_problem

def scaled_observable_formula(parameter_id, parameter_scale):
if parameter_scale == LIN:
return parameter_id
if parameter_scale == LOG:
return f"ln({parameter_id})"
if parameter_scale == LOG10:
return f"log10({parameter_id})"
raise ValueError(f"Unknown parameter scale {parameter_scale}.")

new_measurement_dicts = []
new_observable_dicts = []
for _, row in par_df_tmp.iterrows():
prior_type = row[OBJECTIVE_PRIOR_TYPE]
parameter_scale = row.get(PARAMETER_SCALE, LIN)
if pd.isna(prior_type):
if not pd.isna(row[OBJECTIVE_PRIOR_PARAMETERS]):
raise AssertionError(
"Objective prior parameters are set, but prior type is "
"not specified."
)
continue

if "uniform" in prior_type.lower():
# for measurements, "uniform" is not supported yet
# if necessary, this could still be implemented by adding another
# observable/measurement that will produce a constant objective
# offset
raise NotImplementedError("Uniform priors are not supported.")

parameter_id = row.name
prior_parameters = tuple(
map(
float,
row[OBJECTIVE_PRIOR_PARAMETERS].split(PARAMETER_SEPARATOR),
)
)
if len(prior_parameters) != 2:
raise AssertionError(
"Expected two objective prior parameters for parameter "
f"{parameter_id}, but got {prior_parameters}."
)

# create new observable
new_obs_id = f"prior_{parameter_id}"
if new_obs_id in new_problem.observable_df.index:
raise ValueError(
f"Observable ID {new_obs_id}, which is to be "
"created, already exists."
)
new_observable = {
OBSERVABLE_ID: new_obs_id,
OBSERVABLE_FORMULA: scaled_observable_formula(
parameter_id,
parameter_scale if "parameterScale" in prior_type else LIN,
),
NOISE_FORMULA: f"noiseParameter1_{new_obs_id}",
}
if prior_type in (LOG_NORMAL, LOG_LAPLACE):
new_observable[OBSERVABLE_TRANSFORMATION] = LOG
elif OBSERVABLE_TRANSFORMATION in new_problem.observable_df:
# only set default if the column is already present
new_observable[OBSERVABLE_TRANSFORMATION] = LIN

if prior_type in (NORMAL, PARAMETER_SCALE_NORMAL, LOG_NORMAL):
new_observable[NOISE_DISTRIBUTION] = NORMAL
elif prior_type in (LAPLACE, PARAMETER_SCALE_LAPLACE, LOG_LAPLACE):
new_observable[NOISE_DISTRIBUTION] = LAPLACE
else:
raise NotImplementedError(
f"Objective prior type {prior_type} is not implemented."
)

new_observable_dicts.append(new_observable)

# add measurement
# we could just use any condition and time point since the parameter
# value is constant. however, using an existing timepoint and
# (preequilibrationConditionId+)simulationConditionId will avoid
# requiring extra simulations and solver stops in tools that do not
# check for time dependency of the observable. we use the first
# condition/timepoint from the measurement table
new_measurement = {
OBSERVABLE_ID: new_obs_id,
TIME: problem.measurement_df[TIME].iloc[0],
MEASUREMENT: prior_parameters[0],
NOISE_PARAMETERS: prior_parameters[1],
SIMULATION_CONDITION_ID: new_problem.measurement_df[
SIMULATION_CONDITION_ID
].iloc[0],
}
if PREEQUILIBRATION_CONDITION_ID in new_problem.measurement_df:
new_measurement[
PREEQUILIBRATION_CONDITION_ID
] = new_problem.measurement_df[PREEQUILIBRATION_CONDITION_ID].iloc[
0
]
new_measurement_dicts.append(new_measurement)

# remove prior from parameter table
new_problem.parameter_df.loc[
parameter_id, OBJECTIVE_PRIOR_TYPE
] = np.nan
new_problem.parameter_df.loc[
parameter_id, OBJECTIVE_PRIOR_PARAMETERS
] = np.nan

new_problem.observable_df = pd.concat(
[
pd.DataFrame(new_observable_dicts).set_index(OBSERVABLE_ID),
new_problem.observable_df,
]
)
new_problem.measurement_df = pd.concat(
[
pd.DataFrame(new_measurement_dicts),
new_problem.measurement_df,
],
ignore_index=True,
)
return new_problem
137 changes: 137 additions & 0 deletions tests/v1/test_priors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from copy import deepcopy
from pathlib import Path

import benchmark_models_petab
import numpy as np
import pandas as pd
import pytest
from scipy.stats import norm

import petab.v1
from petab.v1 import get_simulation_conditions
from petab.v1.priors import priors_to_measurements


@pytest.mark.parametrize(
"problem_id", ["Schwen_PONE2014", "Isensee_JCB2018", "Raimundez_PCB2020"]
)
def test_priors_to_measurements(problem_id):
"""Test the conversion of priors to measurements."""
petab_problem_priors: petab.v1.Problem = (
benchmark_models_petab.get_problem(problem_id)
)
petab_problem_priors.visualization_df = None
assert petab.v1.lint_problem(petab_problem_priors) is False

if problem_id == "Isensee_JCB2018":
# required to match the stored simulation results below
petab.v1.flatten_timepoint_specific_output_overrides(
petab_problem_priors
)
assert petab.v1.lint_problem(petab_problem_priors) is False
original_problem = deepcopy(petab_problem_priors)

petab_problem_measurements = priors_to_measurements(petab_problem_priors)

# check that the original problem is not modified
for attr in [
"condition_df",
"parameter_df",
"observable_df",
"measurement_df",
]:
assert (
diff := getattr(petab_problem_priors, attr).compare(
getattr(original_problem, attr)
)
).empty, diff
# check that measurements and observables were added
assert petab.v1.lint_problem(petab_problem_measurements) is False
assert (
petab_problem_measurements.parameter_df.shape[0]
== petab_problem_priors.parameter_df.shape[0]
)
assert (
petab_problem_measurements.observable_df.shape[0]
> petab_problem_priors.observable_df.shape[0]
)
assert (
petab_problem_measurements.measurement_df.shape[0]
> petab_problem_priors.measurement_df.shape[0]
)
# ensure we didn't introduce any new conditions
assert len(
get_simulation_conditions(petab_problem_measurements.measurement_df)
) == len(get_simulation_conditions(petab_problem_priors.measurement_df))

# verify that the objective function value is the same

# load/construct the simulation results
simulation_df_priors = petab.v1.get_simulation_df(
Path(
benchmark_models_petab.MODELS_DIR,
problem_id,
f"simulatedData_{problem_id}.tsv",
)
)
simulation_df_measurements = pd.concat(
[
petab_problem_measurements.measurement_df.rename(
columns={petab.v1.MEASUREMENT: petab.v1.SIMULATION}
)[
petab_problem_measurements.measurement_df[
petab.v1.C.OBSERVABLE_ID
].str.startswith("prior_")
],
simulation_df_priors,
]
)

llh_priors = petab.v1.calculate_llh_for_table(
petab_problem_priors.measurement_df,
simulation_df_priors,
petab_problem_priors.observable_df,
petab_problem_priors.parameter_df,
)
llh_measurements = petab.v1.calculate_llh_for_table(
petab_problem_measurements.measurement_df,
simulation_df_measurements,
petab_problem_measurements.observable_df,
petab_problem_measurements.parameter_df,
)

# get prior objective function contribution
parameter_ids = petab_problem_priors.parameter_df.index.values[
(petab_problem_priors.parameter_df[petab.v1.ESTIMATE] == 1)
& petab_problem_priors.parameter_df[
petab.v1.OBJECTIVE_PRIOR_TYPE
].notna()
]
priors = petab.v1.get_priors_from_df(
petab_problem_priors.parameter_df,
mode="objective",
parameter_ids=parameter_ids,
)
prior_contrib = 0
for parameter_id, prior in zip(parameter_ids, priors, strict=True):
prior_type, prior_pars, par_scale, par_bounds = prior
if prior_type == petab.v1.PARAMETER_SCALE_NORMAL:
prior_contrib += norm.logpdf(
petab_problem_priors.x_nominal_free_scaled[
petab_problem_priors.x_free_ids.index(parameter_id)
],
loc=prior_pars[0],
scale=prior_pars[1],
)
else:
# enable other models, once libpetab has proper support for
# evaluating the prior contribution. until then, two test
# problems should suffice
assert problem_id == "Raimundez_PCB2020"
pytest.skip(f"Prior type {prior_type} not implemented")

assert np.isclose(
llh_priors + prior_contrib, llh_measurements, rtol=1e-3, atol=1e-16
), (llh_priors + prior_contrib, llh_measurements)
# check that the tolerance is not too high
assert np.abs(prior_contrib) > 1e-3 * np.abs(llh_priors)

0 comments on commit cf82b88

Please sign in to comment.