diff --git a/petab/v1/parameters.py b/petab/v1/parameters.py index 382e6b57..8f252988 100644 --- a/petab/v1/parameters.py +++ b/petab/v1/parameters.py @@ -458,7 +458,7 @@ def get_priors_from_df( # get types and parameters of priors from dataframe par_to_estimate = parameter_df.loc[parameter_df[ESTIMATE] == 1] - if parameter_ids: + if parameter_ids is not None: try: par_to_estimate = par_to_estimate.loc[parameter_ids, :] except KeyError as e: diff --git a/petab/v1/priors.py b/petab/v1/priors.py new file mode 100644 index 00000000..f02b1769 --- /dev/null +++ b/petab/v1/priors.py @@ -0,0 +1,203 @@ +"""Functions related to prior handling.""" +import copy + +import numpy as np +import pandas as pd + +from ..v2.C import PREEQUILIBRATION_CONDITION_ID +from . import ( + ESTIMATE, + LAPLACE, + LIN, + LOG, + LOG10, + LOG_LAPLACE, + LOG_NORMAL, + MEASUREMENT, + NOISE_DISTRIBUTION, + NOISE_FORMULA, + NOISE_PARAMETERS, + NORMAL, + OBJECTIVE_PRIOR_PARAMETERS, + OBJECTIVE_PRIOR_TYPE, + OBSERVABLE_FORMULA, + OBSERVABLE_ID, + OBSERVABLE_TRANSFORMATION, + PARAMETER_SCALE, + PARAMETER_SCALE_LAPLACE, + PARAMETER_SCALE_NORMAL, + PARAMETER_SEPARATOR, + SIMULATION_CONDITION_ID, + TIME, + Problem, +) + + +def priors_to_measurements(problem: Problem): + """Convert priors to measurements. + + Reformulate the given problem such that the objective priors are converted + to measurements. This is done by adding a new observable + ``prior_{parameter_id}`` for each estimated parameter that has an objective + prior, and adding a corresponding measurement to the measurement table. + The new measurement is the prior distribution itself. The resulting + optimization problem will be equivalent to the original problem. + This is meant to be used for tools that do not support priors. + + The conversion involves the probability density function (PDF) of the + prior, the parameters (e.g., location and scale) of that prior PDF, and the + scale and value of the estimated parameter. Currently, `uniform` priors are + not supported by this method. This method creates observables with: + + - `observableFormula`: the parameter value on the `parameterScale` + - `observableTransformation`: `log` for `logNormal`/`logLaplace` + distributions, `lin` otherwise + + and measurements with: + + - `measurement`: the PDF location + - `noiseFormula`: the PDF scale + + Arguments + --------- + problem: + The problem to be converted. + + Returns + ------- + The new problem with the priors converted to measurements. + """ + new_problem = copy.deepcopy(problem) + + # we only need to consider parameters that are estimated + par_df_tmp = problem.parameter_df.loc[problem.parameter_df[ESTIMATE] == 1] + + if ( + OBJECTIVE_PRIOR_TYPE not in par_df_tmp + or par_df_tmp.get(OBJECTIVE_PRIOR_TYPE).isna().all() + or OBJECTIVE_PRIOR_PARAMETERS not in par_df_tmp + or par_df_tmp.get(OBJECTIVE_PRIOR_PARAMETERS).isna().all() + ): + # nothing to do + return new_problem + + def scaled_observable_formula(parameter_id, parameter_scale): + if parameter_scale == LIN: + return parameter_id + if parameter_scale == LOG: + return f"ln({parameter_id})" + if parameter_scale == LOG10: + return f"log10({parameter_id})" + raise ValueError(f"Unknown parameter scale {parameter_scale}.") + + new_measurement_dicts = [] + new_observable_dicts = [] + for _, row in par_df_tmp.iterrows(): + prior_type = row[OBJECTIVE_PRIOR_TYPE] + parameter_scale = row.get(PARAMETER_SCALE, LIN) + if pd.isna(prior_type): + if not pd.isna(row[OBJECTIVE_PRIOR_PARAMETERS]): + raise AssertionError( + "Objective prior parameters are set, but prior type is " + "not specified." + ) + continue + + if "uniform" in prior_type.lower(): + # for measurements, "uniform" is not supported yet + # if necessary, this could still be implemented by adding another + # observable/measurement that will produce a constant objective + # offset + raise NotImplementedError("Uniform priors are not supported.") + + parameter_id = row.name + prior_parameters = tuple( + map( + float, + row[OBJECTIVE_PRIOR_PARAMETERS].split(PARAMETER_SEPARATOR), + ) + ) + if len(prior_parameters) != 2: + raise AssertionError( + "Expected two objective prior parameters for parameter " + f"{parameter_id}, but got {prior_parameters}." + ) + + # create new observable + new_obs_id = f"prior_{parameter_id}" + if new_obs_id in new_problem.observable_df.index: + raise ValueError( + f"Observable ID {new_obs_id}, which is to be " + "created, already exists." + ) + new_observable = { + OBSERVABLE_ID: new_obs_id, + OBSERVABLE_FORMULA: scaled_observable_formula( + parameter_id, + parameter_scale if "parameterScale" in prior_type else LIN, + ), + NOISE_FORMULA: f"noiseParameter1_{new_obs_id}", + } + if prior_type in (LOG_NORMAL, LOG_LAPLACE): + new_observable[OBSERVABLE_TRANSFORMATION] = LOG + elif OBSERVABLE_TRANSFORMATION in new_problem.observable_df: + # only set default if the column is already present + new_observable[OBSERVABLE_TRANSFORMATION] = LIN + + if prior_type in (NORMAL, PARAMETER_SCALE_NORMAL, LOG_NORMAL): + new_observable[NOISE_DISTRIBUTION] = NORMAL + elif prior_type in (LAPLACE, PARAMETER_SCALE_LAPLACE, LOG_LAPLACE): + new_observable[NOISE_DISTRIBUTION] = LAPLACE + else: + raise NotImplementedError( + f"Objective prior type {prior_type} is not implemented." + ) + + new_observable_dicts.append(new_observable) + + # add measurement + # we could just use any condition and time point since the parameter + # value is constant. however, using an existing timepoint and + # (preequilibrationConditionId+)simulationConditionId will avoid + # requiring extra simulations and solver stops in tools that do not + # check for time dependency of the observable. we use the first + # condition/timepoint from the measurement table + new_measurement = { + OBSERVABLE_ID: new_obs_id, + TIME: problem.measurement_df[TIME].iloc[0], + MEASUREMENT: prior_parameters[0], + NOISE_PARAMETERS: prior_parameters[1], + SIMULATION_CONDITION_ID: new_problem.measurement_df[ + SIMULATION_CONDITION_ID + ].iloc[0], + } + if PREEQUILIBRATION_CONDITION_ID in new_problem.measurement_df: + new_measurement[ + PREEQUILIBRATION_CONDITION_ID + ] = new_problem.measurement_df[PREEQUILIBRATION_CONDITION_ID].iloc[ + 0 + ] + new_measurement_dicts.append(new_measurement) + + # remove prior from parameter table + new_problem.parameter_df.loc[ + parameter_id, OBJECTIVE_PRIOR_TYPE + ] = np.nan + new_problem.parameter_df.loc[ + parameter_id, OBJECTIVE_PRIOR_PARAMETERS + ] = np.nan + + new_problem.observable_df = pd.concat( + [ + pd.DataFrame(new_observable_dicts).set_index(OBSERVABLE_ID), + new_problem.observable_df, + ] + ) + new_problem.measurement_df = pd.concat( + [ + pd.DataFrame(new_measurement_dicts), + new_problem.measurement_df, + ], + ignore_index=True, + ) + return new_problem diff --git a/tests/v1/test_priors.py b/tests/v1/test_priors.py new file mode 100644 index 00000000..ea47e54f --- /dev/null +++ b/tests/v1/test_priors.py @@ -0,0 +1,137 @@ +from copy import deepcopy +from pathlib import Path + +import benchmark_models_petab +import numpy as np +import pandas as pd +import pytest +from scipy.stats import norm + +import petab.v1 +from petab.v1 import get_simulation_conditions +from petab.v1.priors import priors_to_measurements + + +@pytest.mark.parametrize( + "problem_id", ["Schwen_PONE2014", "Isensee_JCB2018", "Raimundez_PCB2020"] +) +def test_priors_to_measurements(problem_id): + """Test the conversion of priors to measurements.""" + petab_problem_priors: petab.v1.Problem = ( + benchmark_models_petab.get_problem(problem_id) + ) + petab_problem_priors.visualization_df = None + assert petab.v1.lint_problem(petab_problem_priors) is False + + if problem_id == "Isensee_JCB2018": + # required to match the stored simulation results below + petab.v1.flatten_timepoint_specific_output_overrides( + petab_problem_priors + ) + assert petab.v1.lint_problem(petab_problem_priors) is False + original_problem = deepcopy(petab_problem_priors) + + petab_problem_measurements = priors_to_measurements(petab_problem_priors) + + # check that the original problem is not modified + for attr in [ + "condition_df", + "parameter_df", + "observable_df", + "measurement_df", + ]: + assert ( + diff := getattr(petab_problem_priors, attr).compare( + getattr(original_problem, attr) + ) + ).empty, diff + # check that measurements and observables were added + assert petab.v1.lint_problem(petab_problem_measurements) is False + assert ( + petab_problem_measurements.parameter_df.shape[0] + == petab_problem_priors.parameter_df.shape[0] + ) + assert ( + petab_problem_measurements.observable_df.shape[0] + > petab_problem_priors.observable_df.shape[0] + ) + assert ( + petab_problem_measurements.measurement_df.shape[0] + > petab_problem_priors.measurement_df.shape[0] + ) + # ensure we didn't introduce any new conditions + assert len( + get_simulation_conditions(petab_problem_measurements.measurement_df) + ) == len(get_simulation_conditions(petab_problem_priors.measurement_df)) + + # verify that the objective function value is the same + + # load/construct the simulation results + simulation_df_priors = petab.v1.get_simulation_df( + Path( + benchmark_models_petab.MODELS_DIR, + problem_id, + f"simulatedData_{problem_id}.tsv", + ) + ) + simulation_df_measurements = pd.concat( + [ + petab_problem_measurements.measurement_df.rename( + columns={petab.v1.MEASUREMENT: petab.v1.SIMULATION} + )[ + petab_problem_measurements.measurement_df[ + petab.v1.C.OBSERVABLE_ID + ].str.startswith("prior_") + ], + simulation_df_priors, + ] + ) + + llh_priors = petab.v1.calculate_llh_for_table( + petab_problem_priors.measurement_df, + simulation_df_priors, + petab_problem_priors.observable_df, + petab_problem_priors.parameter_df, + ) + llh_measurements = petab.v1.calculate_llh_for_table( + petab_problem_measurements.measurement_df, + simulation_df_measurements, + petab_problem_measurements.observable_df, + petab_problem_measurements.parameter_df, + ) + + # get prior objective function contribution + parameter_ids = petab_problem_priors.parameter_df.index.values[ + (petab_problem_priors.parameter_df[petab.v1.ESTIMATE] == 1) + & petab_problem_priors.parameter_df[ + petab.v1.OBJECTIVE_PRIOR_TYPE + ].notna() + ] + priors = petab.v1.get_priors_from_df( + petab_problem_priors.parameter_df, + mode="objective", + parameter_ids=parameter_ids, + ) + prior_contrib = 0 + for parameter_id, prior in zip(parameter_ids, priors, strict=True): + prior_type, prior_pars, par_scale, par_bounds = prior + if prior_type == petab.v1.PARAMETER_SCALE_NORMAL: + prior_contrib += norm.logpdf( + petab_problem_priors.x_nominal_free_scaled[ + petab_problem_priors.x_free_ids.index(parameter_id) + ], + loc=prior_pars[0], + scale=prior_pars[1], + ) + else: + # enable other models, once libpetab has proper support for + # evaluating the prior contribution. until then, two test + # problems should suffice + assert problem_id == "Raimundez_PCB2020" + pytest.skip(f"Prior type {prior_type} not implemented") + + assert np.isclose( + llh_priors + prior_contrib, llh_measurements, rtol=1e-3, atol=1e-16 + ), (llh_priors + prior_contrib, llh_measurements) + # check that the tolerance is not too high + assert np.abs(prior_contrib) > 1e-3 * np.abs(llh_priors)