Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function for replacing objectivePrior* by observables/measurements #309

Merged
merged 9 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion petab/v1/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def get_priors_from_df(
# get types and parameters of priors from dataframe
par_to_estimate = parameter_df.loc[parameter_df[ESTIMATE] == 1]

if parameter_ids:
if parameter_ids is not None:
try:
par_to_estimate = par_to_estimate.loc[parameter_ids, :]
except KeyError as e:
Expand Down
203 changes: 203 additions & 0 deletions petab/v1/priors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Functions related to prior handling."""
import copy

import numpy as np
import pandas as pd

from ..v2.C import PREEQUILIBRATION_CONDITION_ID
from . import (
ESTIMATE,
LAPLACE,
LIN,
LOG,
LOG10,
LOG_LAPLACE,
LOG_NORMAL,
MEASUREMENT,
NOISE_DISTRIBUTION,
NOISE_FORMULA,
NOISE_PARAMETERS,
NORMAL,
OBJECTIVE_PRIOR_PARAMETERS,
OBJECTIVE_PRIOR_TYPE,
OBSERVABLE_FORMULA,
OBSERVABLE_ID,
OBSERVABLE_TRANSFORMATION,
PARAMETER_SCALE,
PARAMETER_SCALE_LAPLACE,
PARAMETER_SCALE_NORMAL,
PARAMETER_SEPARATOR,
SIMULATION_CONDITION_ID,
TIME,
Problem,
)


def priors_to_measurements(problem: Problem):
"""Convert priors to measurements.

Reformulate the given problem such that the objective priors are converted
to measurements. This is done by adding a new observable
``prior_{parameter_id}`` for each estimated parameter that has an objective
prior, and adding a corresponding measurement to the measurement table.
The new measurement is the prior distribution itself. The resulting
optimization problem will be equivalent to the original problem.
This is meant to be used for tools that do not support priors.
dweindl marked this conversation as resolved.
Show resolved Hide resolved

The conversion involves the probability density function (PDF) of the
prior, the parameters (e.g., location and scale) of that prior PDF, and the
scale and value of the estimated parameter. Currently, `uniform` priors are
not supported by this method. This method creates observables with:

- `observableFormula`: the parameter value on the `parameterScale`
- `observableTransformation`: `log` for `logNormal`/`logLaplace`
distributions, `lin` otherwise

and measurements with:

- `measurement`: the PDF location
- `noiseFormula`: the PDF scale

Arguments
---------
problem:
The problem to be converted.

Returns
-------
The new problem with the priors converted to measurements.
"""
new_problem = copy.deepcopy(problem)

# we only need to consider parameters that are estimated
par_df_tmp = problem.parameter_df.loc[problem.parameter_df[ESTIMATE] == 1]

if (
OBJECTIVE_PRIOR_TYPE not in par_df_tmp
or par_df_tmp.get(OBJECTIVE_PRIOR_TYPE).isna().all()
or OBJECTIVE_PRIOR_PARAMETERS not in par_df_tmp
or par_df_tmp.get(OBJECTIVE_PRIOR_PARAMETERS).isna().all()
):
# nothing to do
return new_problem

def scaled_observable_formula(parameter_id, parameter_scale):
if parameter_scale == LIN:
return parameter_id
if parameter_scale == LOG:
return f"ln({parameter_id})"
if parameter_scale == LOG10:
return f"log10({parameter_id})"
raise ValueError(f"Unknown parameter scale {parameter_scale}.")

new_measurement_dicts = []
new_observable_dicts = []
for _, row in par_df_tmp.iterrows():
prior_type = row[OBJECTIVE_PRIOR_TYPE]
parameter_scale = row.get(PARAMETER_SCALE, LIN)
if pd.isna(prior_type):
if not pd.isna(row[OBJECTIVE_PRIOR_PARAMETERS]):
raise AssertionError(
"Objective prior parameters are set, but prior type is "
"not specified."
)
continue

if "uniform" in prior_type.lower():
# for measurements, "uniform" is not supported yet
# if necessary, this could still be implemented by adding another
# observable/measurement that will produce a constant objective
# offset
raise NotImplementedError("Uniform priors are not supported.")

parameter_id = row.name
prior_parameters = tuple(
map(
float,
row[OBJECTIVE_PRIOR_PARAMETERS].split(PARAMETER_SEPARATOR),
)
)
if len(prior_parameters) != 2:
raise AssertionError(
"Expected two objective prior parameters for parameter "
f"{parameter_id}, but got {prior_parameters}."
)

# create new observable
new_obs_id = f"prior_{parameter_id}"
if new_obs_id in new_problem.observable_df.index:
raise ValueError(
f"Observable ID {new_obs_id}, which is to be "
"created, already exists."
)
new_observable = {
OBSERVABLE_ID: new_obs_id,
OBSERVABLE_FORMULA: scaled_observable_formula(
parameter_id,
parameter_scale if "parameterScale" in prior_type else LIN,
),
NOISE_FORMULA: f"noiseParameter1_{new_obs_id}",
}
if prior_type in (LOG_NORMAL, LOG_LAPLACE):
new_observable[OBSERVABLE_TRANSFORMATION] = LOG
elif OBSERVABLE_TRANSFORMATION in new_problem.observable_df:
# only set default if the column is already present
new_observable[OBSERVABLE_TRANSFORMATION] = LIN

if prior_type in (NORMAL, PARAMETER_SCALE_NORMAL, LOG_NORMAL):
new_observable[NOISE_DISTRIBUTION] = NORMAL
elif prior_type in (LAPLACE, PARAMETER_SCALE_LAPLACE, LOG_LAPLACE):
new_observable[NOISE_DISTRIBUTION] = LAPLACE
else:
raise NotImplementedError(
f"Objective prior type {prior_type} is not implemented."
)

new_observable_dicts.append(new_observable)

# add measurement
# we could just use any condition and time point since the parameter
# value is constant. however, using an existing timepoint and
# (preequilibrationConditionId+)simulationConditionId will avoid
# requiring extra simulations and solver stops in tools that do not
# check for time dependency of the observable. we use the first
# condition/timepoint from the measurement table
new_measurement = {
OBSERVABLE_ID: new_obs_id,
TIME: problem.measurement_df[TIME].iloc[0],
MEASUREMENT: prior_parameters[0],
NOISE_PARAMETERS: prior_parameters[1],
SIMULATION_CONDITION_ID: new_problem.measurement_df[
SIMULATION_CONDITION_ID
].iloc[0],
}
if PREEQUILIBRATION_CONDITION_ID in new_problem.measurement_df:
new_measurement[
PREEQUILIBRATION_CONDITION_ID
] = new_problem.measurement_df[PREEQUILIBRATION_CONDITION_ID].iloc[
0
]
new_measurement_dicts.append(new_measurement)

# remove prior from parameter table
new_problem.parameter_df.loc[
parameter_id, OBJECTIVE_PRIOR_TYPE
] = np.nan
new_problem.parameter_df.loc[
parameter_id, OBJECTIVE_PRIOR_PARAMETERS
] = np.nan

new_problem.observable_df = pd.concat(
[
pd.DataFrame(new_observable_dicts).set_index(OBSERVABLE_ID),
new_problem.observable_df,
]
)
new_problem.measurement_df = pd.concat(
[
pd.DataFrame(new_measurement_dicts),
new_problem.measurement_df,
],
ignore_index=True,
)
return new_problem
137 changes: 137 additions & 0 deletions tests/v1/test_priors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from copy import deepcopy
from pathlib import Path

import benchmark_models_petab
import numpy as np
import pandas as pd
import pytest
from scipy.stats import norm

import petab.v1
from petab.v1 import get_simulation_conditions
from petab.v1.priors import priors_to_measurements


@pytest.mark.parametrize(
"problem_id", ["Schwen_PONE2014", "Isensee_JCB2018", "Raimundez_PCB2020"]
)
def test_priors_to_measurements(problem_id):
"""Test the conversion of priors to measurements."""
petab_problem_priors: petab.v1.Problem = (
benchmark_models_petab.get_problem(problem_id)
)
petab_problem_priors.visualization_df = None
assert petab.v1.lint_problem(petab_problem_priors) is False

if problem_id == "Isensee_JCB2018":
# required to match the stored simulation results below
petab.v1.flatten_timepoint_specific_output_overrides(
petab_problem_priors
)
assert petab.v1.lint_problem(petab_problem_priors) is False
original_problem = deepcopy(petab_problem_priors)

petab_problem_measurements = priors_to_measurements(petab_problem_priors)

# check that the original problem is not modified
for attr in [
"condition_df",
"parameter_df",
"observable_df",
"measurement_df",
]:
assert (
diff := getattr(petab_problem_priors, attr).compare(
getattr(original_problem, attr)
)
).empty, diff
# check that measurements and observables were added
assert petab.v1.lint_problem(petab_problem_measurements) is False
assert (
petab_problem_measurements.parameter_df.shape[0]
== petab_problem_priors.parameter_df.shape[0]
)
assert (
petab_problem_measurements.observable_df.shape[0]
> petab_problem_priors.observable_df.shape[0]
)
assert (
petab_problem_measurements.measurement_df.shape[0]
> petab_problem_priors.measurement_df.shape[0]
)
# ensure we didn't introduce any new conditions
assert len(
get_simulation_conditions(petab_problem_measurements.measurement_df)
) == len(get_simulation_conditions(petab_problem_priors.measurement_df))

# verify that the objective function value is the same

# load/construct the simulation results
simulation_df_priors = petab.v1.get_simulation_df(
Path(
benchmark_models_petab.MODELS_DIR,
problem_id,
f"simulatedData_{problem_id}.tsv",
)
)
simulation_df_measurements = pd.concat(
[
petab_problem_measurements.measurement_df.rename(
columns={petab.v1.MEASUREMENT: petab.v1.SIMULATION}
)[
petab_problem_measurements.measurement_df[
petab.v1.C.OBSERVABLE_ID
].str.startswith("prior_")
],
simulation_df_priors,
]
)

llh_priors = petab.v1.calculate_llh_for_table(
petab_problem_priors.measurement_df,
simulation_df_priors,
petab_problem_priors.observable_df,
petab_problem_priors.parameter_df,
)
llh_measurements = petab.v1.calculate_llh_for_table(
petab_problem_measurements.measurement_df,
simulation_df_measurements,
petab_problem_measurements.observable_df,
petab_problem_measurements.parameter_df,
)

# get prior objective function contribution
parameter_ids = petab_problem_priors.parameter_df.index.values[
(petab_problem_priors.parameter_df[petab.v1.ESTIMATE] == 1)
& petab_problem_priors.parameter_df[
petab.v1.OBJECTIVE_PRIOR_TYPE
].notna()
]
priors = petab.v1.get_priors_from_df(
petab_problem_priors.parameter_df,
mode="objective",
parameter_ids=parameter_ids,
)
prior_contrib = 0
for parameter_id, prior in zip(parameter_ids, priors, strict=True):
prior_type, prior_pars, par_scale, par_bounds = prior
if prior_type == petab.v1.PARAMETER_SCALE_NORMAL:
prior_contrib += norm.logpdf(
petab_problem_priors.x_nominal_free_scaled[
petab_problem_priors.x_free_ids.index(parameter_id)
],
loc=prior_pars[0],
scale=prior_pars[1],
)
Comment on lines +119 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks useful, could be moved to some calculate_posterior_for_table method like calculate_llh_for_table

Copy link
Member Author

@dweindl dweindl Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully agree. However, I would like to have some proper Prior class instead of those current tuples before extending the priors API. I would rather keep that for another PR. I will open an issue.

-> #311, #312

else:
# enable other models, once libpetab has proper support for
# evaluating the prior contribution. until then, two test
# problems should suffice
assert problem_id == "Raimundez_PCB2020"
pytest.skip(f"Prior type {prior_type} not implemented")

assert np.isclose(
llh_priors + prior_contrib, llh_measurements, rtol=1e-3, atol=1e-16
), (llh_priors + prior_contrib, llh_measurements)
# check that the tolerance is not too high
assert np.abs(prior_contrib) > 1e-3 * np.abs(llh_priors)
dweindl marked this conversation as resolved.
Show resolved Hide resolved