From cf82b88abb5fda184d8654265218da3604adb938 Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Tue, 15 Oct 2024 16:22:22 +0200 Subject: [PATCH] Function for replacing `objectivePrior*` by observables/measurements (#309) Reformulate the given PEtab problem such that the objective priors are converted to measurements. This is done by adding a new observable for each prior and adding a corresponding measurement to the measurement table. The resulting optimization problem will be equivalent to the original problem. This is meant to be used for tools that do not support priors. Closes #307 --------- Co-authored-by: Dilan Pathirana <59329744+dilpath@users.noreply.github.com> --- petab/v1/parameters.py | 2 +- petab/v1/priors.py | 203 ++++++++++++++++++++++++++++++++++++++++ tests/v1/test_priors.py | 137 +++++++++++++++++++++++++++ 3 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 petab/v1/priors.py create mode 100644 tests/v1/test_priors.py diff --git a/petab/v1/parameters.py b/petab/v1/parameters.py index 382e6b57..8f252988 100644 --- a/petab/v1/parameters.py +++ b/petab/v1/parameters.py @@ -458,7 +458,7 @@ def get_priors_from_df( # get types and parameters of priors from dataframe par_to_estimate = parameter_df.loc[parameter_df[ESTIMATE] == 1] - if parameter_ids: + if parameter_ids is not None: try: par_to_estimate = par_to_estimate.loc[parameter_ids, :] except KeyError as e: diff --git a/petab/v1/priors.py b/petab/v1/priors.py new file mode 100644 index 00000000..f02b1769 --- /dev/null +++ b/petab/v1/priors.py @@ -0,0 +1,203 @@ +"""Functions related to prior handling.""" +import copy + +import numpy as np +import pandas as pd + +from ..v2.C import PREEQUILIBRATION_CONDITION_ID +from . import ( + ESTIMATE, + LAPLACE, + LIN, + LOG, + LOG10, + LOG_LAPLACE, + LOG_NORMAL, + MEASUREMENT, + NOISE_DISTRIBUTION, + NOISE_FORMULA, + NOISE_PARAMETERS, + NORMAL, + OBJECTIVE_PRIOR_PARAMETERS, + OBJECTIVE_PRIOR_TYPE, + OBSERVABLE_FORMULA, + OBSERVABLE_ID, + OBSERVABLE_TRANSFORMATION, + PARAMETER_SCALE, + PARAMETER_SCALE_LAPLACE, + PARAMETER_SCALE_NORMAL, + PARAMETER_SEPARATOR, + SIMULATION_CONDITION_ID, + TIME, + Problem, +) + + +def priors_to_measurements(problem: Problem): + """Convert priors to measurements. + + Reformulate the given problem such that the objective priors are converted + to measurements. This is done by adding a new observable + ``prior_{parameter_id}`` for each estimated parameter that has an objective + prior, and adding a corresponding measurement to the measurement table. + The new measurement is the prior distribution itself. The resulting + optimization problem will be equivalent to the original problem. + This is meant to be used for tools that do not support priors. + + The conversion involves the probability density function (PDF) of the + prior, the parameters (e.g., location and scale) of that prior PDF, and the + scale and value of the estimated parameter. Currently, `uniform` priors are + not supported by this method. This method creates observables with: + + - `observableFormula`: the parameter value on the `parameterScale` + - `observableTransformation`: `log` for `logNormal`/`logLaplace` + distributions, `lin` otherwise + + and measurements with: + + - `measurement`: the PDF location + - `noiseFormula`: the PDF scale + + Arguments + --------- + problem: + The problem to be converted. + + Returns + ------- + The new problem with the priors converted to measurements. + """ + new_problem = copy.deepcopy(problem) + + # we only need to consider parameters that are estimated + par_df_tmp = problem.parameter_df.loc[problem.parameter_df[ESTIMATE] == 1] + + if ( + OBJECTIVE_PRIOR_TYPE not in par_df_tmp + or par_df_tmp.get(OBJECTIVE_PRIOR_TYPE).isna().all() + or OBJECTIVE_PRIOR_PARAMETERS not in par_df_tmp + or par_df_tmp.get(OBJECTIVE_PRIOR_PARAMETERS).isna().all() + ): + # nothing to do + return new_problem + + def scaled_observable_formula(parameter_id, parameter_scale): + if parameter_scale == LIN: + return parameter_id + if parameter_scale == LOG: + return f"ln({parameter_id})" + if parameter_scale == LOG10: + return f"log10({parameter_id})" + raise ValueError(f"Unknown parameter scale {parameter_scale}.") + + new_measurement_dicts = [] + new_observable_dicts = [] + for _, row in par_df_tmp.iterrows(): + prior_type = row[OBJECTIVE_PRIOR_TYPE] + parameter_scale = row.get(PARAMETER_SCALE, LIN) + if pd.isna(prior_type): + if not pd.isna(row[OBJECTIVE_PRIOR_PARAMETERS]): + raise AssertionError( + "Objective prior parameters are set, but prior type is " + "not specified." + ) + continue + + if "uniform" in prior_type.lower(): + # for measurements, "uniform" is not supported yet + # if necessary, this could still be implemented by adding another + # observable/measurement that will produce a constant objective + # offset + raise NotImplementedError("Uniform priors are not supported.") + + parameter_id = row.name + prior_parameters = tuple( + map( + float, + row[OBJECTIVE_PRIOR_PARAMETERS].split(PARAMETER_SEPARATOR), + ) + ) + if len(prior_parameters) != 2: + raise AssertionError( + "Expected two objective prior parameters for parameter " + f"{parameter_id}, but got {prior_parameters}." + ) + + # create new observable + new_obs_id = f"prior_{parameter_id}" + if new_obs_id in new_problem.observable_df.index: + raise ValueError( + f"Observable ID {new_obs_id}, which is to be " + "created, already exists." + ) + new_observable = { + OBSERVABLE_ID: new_obs_id, + OBSERVABLE_FORMULA: scaled_observable_formula( + parameter_id, + parameter_scale if "parameterScale" in prior_type else LIN, + ), + NOISE_FORMULA: f"noiseParameter1_{new_obs_id}", + } + if prior_type in (LOG_NORMAL, LOG_LAPLACE): + new_observable[OBSERVABLE_TRANSFORMATION] = LOG + elif OBSERVABLE_TRANSFORMATION in new_problem.observable_df: + # only set default if the column is already present + new_observable[OBSERVABLE_TRANSFORMATION] = LIN + + if prior_type in (NORMAL, PARAMETER_SCALE_NORMAL, LOG_NORMAL): + new_observable[NOISE_DISTRIBUTION] = NORMAL + elif prior_type in (LAPLACE, PARAMETER_SCALE_LAPLACE, LOG_LAPLACE): + new_observable[NOISE_DISTRIBUTION] = LAPLACE + else: + raise NotImplementedError( + f"Objective prior type {prior_type} is not implemented." + ) + + new_observable_dicts.append(new_observable) + + # add measurement + # we could just use any condition and time point since the parameter + # value is constant. however, using an existing timepoint and + # (preequilibrationConditionId+)simulationConditionId will avoid + # requiring extra simulations and solver stops in tools that do not + # check for time dependency of the observable. we use the first + # condition/timepoint from the measurement table + new_measurement = { + OBSERVABLE_ID: new_obs_id, + TIME: problem.measurement_df[TIME].iloc[0], + MEASUREMENT: prior_parameters[0], + NOISE_PARAMETERS: prior_parameters[1], + SIMULATION_CONDITION_ID: new_problem.measurement_df[ + SIMULATION_CONDITION_ID + ].iloc[0], + } + if PREEQUILIBRATION_CONDITION_ID in new_problem.measurement_df: + new_measurement[ + PREEQUILIBRATION_CONDITION_ID + ] = new_problem.measurement_df[PREEQUILIBRATION_CONDITION_ID].iloc[ + 0 + ] + new_measurement_dicts.append(new_measurement) + + # remove prior from parameter table + new_problem.parameter_df.loc[ + parameter_id, OBJECTIVE_PRIOR_TYPE + ] = np.nan + new_problem.parameter_df.loc[ + parameter_id, OBJECTIVE_PRIOR_PARAMETERS + ] = np.nan + + new_problem.observable_df = pd.concat( + [ + pd.DataFrame(new_observable_dicts).set_index(OBSERVABLE_ID), + new_problem.observable_df, + ] + ) + new_problem.measurement_df = pd.concat( + [ + pd.DataFrame(new_measurement_dicts), + new_problem.measurement_df, + ], + ignore_index=True, + ) + return new_problem diff --git a/tests/v1/test_priors.py b/tests/v1/test_priors.py new file mode 100644 index 00000000..ea47e54f --- /dev/null +++ b/tests/v1/test_priors.py @@ -0,0 +1,137 @@ +from copy import deepcopy +from pathlib import Path + +import benchmark_models_petab +import numpy as np +import pandas as pd +import pytest +from scipy.stats import norm + +import petab.v1 +from petab.v1 import get_simulation_conditions +from petab.v1.priors import priors_to_measurements + + +@pytest.mark.parametrize( + "problem_id", ["Schwen_PONE2014", "Isensee_JCB2018", "Raimundez_PCB2020"] +) +def test_priors_to_measurements(problem_id): + """Test the conversion of priors to measurements.""" + petab_problem_priors: petab.v1.Problem = ( + benchmark_models_petab.get_problem(problem_id) + ) + petab_problem_priors.visualization_df = None + assert petab.v1.lint_problem(petab_problem_priors) is False + + if problem_id == "Isensee_JCB2018": + # required to match the stored simulation results below + petab.v1.flatten_timepoint_specific_output_overrides( + petab_problem_priors + ) + assert petab.v1.lint_problem(petab_problem_priors) is False + original_problem = deepcopy(petab_problem_priors) + + petab_problem_measurements = priors_to_measurements(petab_problem_priors) + + # check that the original problem is not modified + for attr in [ + "condition_df", + "parameter_df", + "observable_df", + "measurement_df", + ]: + assert ( + diff := getattr(petab_problem_priors, attr).compare( + getattr(original_problem, attr) + ) + ).empty, diff + # check that measurements and observables were added + assert petab.v1.lint_problem(petab_problem_measurements) is False + assert ( + petab_problem_measurements.parameter_df.shape[0] + == petab_problem_priors.parameter_df.shape[0] + ) + assert ( + petab_problem_measurements.observable_df.shape[0] + > petab_problem_priors.observable_df.shape[0] + ) + assert ( + petab_problem_measurements.measurement_df.shape[0] + > petab_problem_priors.measurement_df.shape[0] + ) + # ensure we didn't introduce any new conditions + assert len( + get_simulation_conditions(petab_problem_measurements.measurement_df) + ) == len(get_simulation_conditions(petab_problem_priors.measurement_df)) + + # verify that the objective function value is the same + + # load/construct the simulation results + simulation_df_priors = petab.v1.get_simulation_df( + Path( + benchmark_models_petab.MODELS_DIR, + problem_id, + f"simulatedData_{problem_id}.tsv", + ) + ) + simulation_df_measurements = pd.concat( + [ + petab_problem_measurements.measurement_df.rename( + columns={petab.v1.MEASUREMENT: petab.v1.SIMULATION} + )[ + petab_problem_measurements.measurement_df[ + petab.v1.C.OBSERVABLE_ID + ].str.startswith("prior_") + ], + simulation_df_priors, + ] + ) + + llh_priors = petab.v1.calculate_llh_for_table( + petab_problem_priors.measurement_df, + simulation_df_priors, + petab_problem_priors.observable_df, + petab_problem_priors.parameter_df, + ) + llh_measurements = petab.v1.calculate_llh_for_table( + petab_problem_measurements.measurement_df, + simulation_df_measurements, + petab_problem_measurements.observable_df, + petab_problem_measurements.parameter_df, + ) + + # get prior objective function contribution + parameter_ids = petab_problem_priors.parameter_df.index.values[ + (petab_problem_priors.parameter_df[petab.v1.ESTIMATE] == 1) + & petab_problem_priors.parameter_df[ + petab.v1.OBJECTIVE_PRIOR_TYPE + ].notna() + ] + priors = petab.v1.get_priors_from_df( + petab_problem_priors.parameter_df, + mode="objective", + parameter_ids=parameter_ids, + ) + prior_contrib = 0 + for parameter_id, prior in zip(parameter_ids, priors, strict=True): + prior_type, prior_pars, par_scale, par_bounds = prior + if prior_type == petab.v1.PARAMETER_SCALE_NORMAL: + prior_contrib += norm.logpdf( + petab_problem_priors.x_nominal_free_scaled[ + petab_problem_priors.x_free_ids.index(parameter_id) + ], + loc=prior_pars[0], + scale=prior_pars[1], + ) + else: + # enable other models, once libpetab has proper support for + # evaluating the prior contribution. until then, two test + # problems should suffice + assert problem_id == "Raimundez_PCB2020" + pytest.skip(f"Prior type {prior_type} not implemented") + + assert np.isclose( + llh_priors + prior_contrib, llh_measurements, rtol=1e-3, atol=1e-16 + ), (llh_priors + prior_contrib, llh_measurements) + # check that the tolerance is not too high + assert np.abs(prior_contrib) > 1e-3 * np.abs(llh_priors)