diff --git a/src/forward_link.py b/src/forward_link.py new file mode 100644 index 00000000..1ac97429 --- /dev/null +++ b/src/forward_link.py @@ -0,0 +1,76 @@ +import numpy as np +import pandas as pd + + +def calculate_imputation_link( + df: pd.DataFrame, + period: str, + strata: str, + match_col: str, + target_variable: str, + predictive_variable: str, +) -> pd.DataFrame: + """ + Calculate link between target_variable and predictive_variable by strata, + a match_col must be supplied which indicates if target_variable + and predictive_variable can be linked. + + Parameters + ---------- + df : pd.Dataframe + Original dataframe. + period : str + Column name containing time period. + strata : str + Column name containing strata information (sic). + match_col : str + Column name of the matched pair links, this column should be bool. + target_variable : str + Column name of the targeted variable. + predictive_variable : str + Column name of the predicted target variable. + + Returns + ------- + df : pd.DataFrame + A pandas DataFrame with a new column containing either f_link or b_link + based on the input parameters. + """ + + df_intermediate = df.copy() + + if match_col == "f_matched_pair" and predictive_variable == "f_predictive_question": + link_col_name = "f_link" + + elif ( + match_col == "b_matched_pair" and predictive_variable == "b_predictive_question" + ): + link_col_name = "b_link" + + else: + raise ValueError( + f""" + {match_col} and {predictive_variable} do not have same wildcard.""" + ) + + df_intermediate[target_variable] = ( + df_intermediate[target_variable] * df_intermediate[match_col] + ) + + df_intermediate[predictive_variable] = ( + df_intermediate[predictive_variable] * df_intermediate[match_col] + ) + + numerator = df_intermediate.groupby([strata, period])[target_variable].transform( + "sum" + ) + + denominator = df_intermediate.groupby([strata, period])[ + predictive_variable + ].transform("sum") + + denominator.replace(0, np.nan, inplace=True) # cover division with 0 + + df[link_col_name] = numerator / denominator + + return df diff --git a/tests/calculate_links_test_data.csv b/tests/calculate_links_test_data.csv new file mode 100755 index 00000000..72e6408d --- /dev/null +++ b/tests/calculate_links_test_data.csv @@ -0,0 +1,16 @@ +,identifier,period,group,question,f_predictive_question,b_predictive_question,f_matched_pair,b_matched_pair,f_link,b_link +0,10001,202001,1,547.0,,362.0,False,True,,0.9925133689839573 +1,10001,202002,1,362.0,547.0,895.0,True,True,1.0075431034482758,0.8431018935978359 +2,10001,202003,1,895.0,362.0,,True,False,1.186096256684492, +3,10002,202001,1,381.0,,573.0,False,True,,0.9925133689839573 +4,10002,202002,1,573.0,381.0,214.0,True,True,1.0075431034482758,0.8431018935978359 +5,10002,202003,1,214.0,573.0,,True,False,1.186096256684492, +6,10001,202001,2,961.0,,267.0,False,True,,1.693854748603352 +7,10001,202002,2,267.0,961.0,314.0,True,True,0.5903693931398417,0.8523809523809524 +8,10001,202003,2,314.0,267.0,,True,False,1.1731843575418994, +9,10002,202001,2,555.0,,628.0,False,True,,1.693854748603352 +10,10002,202002,2,628.0,555.0,736.0,True,True,0.5903693931398417,0.8523809523809524 +11,10002,202003,2,736.0,628.0,,True,False,1.1731843575418994, +12,10005,202001,1,,,,False,False,,0.9925133689839573 +13,10005,202002,2,,,100.0,False,False,0.5903693931398417,0.8523809523809524 +14,10005,202003,2,100.0,,,False,False,1.1731843575418994, diff --git a/tests/test_forward_link.py b/tests/test_forward_link.py new file mode 100644 index 00000000..51fa63c8 --- /dev/null +++ b/tests/test_forward_link.py @@ -0,0 +1,75 @@ +import pytest +from helper_functions import load_and_format +from pandas.testing import assert_frame_equal + +from src.forward_link import calculate_imputation_link + +scenarios = ["calculate_links_test_data"] + + +@pytest.mark.parametrize("scenario", scenarios) +class TestLinks: + def test_forward_links(self, scenario): + """Test if function returns the f_link column""" + + df_output = load_and_format("tests/" + scenario + ".csv") + + df_input = df_output.drop(columns=["f_link"]) + + df_input = calculate_imputation_link( + df_input, + "period", + "group", + "f_matched_pair", + "question", + "f_predictive_question", + ) + + assert_frame_equal(df_input, df_output, check_like=True) + + def test_back_links(self, scenario): + """Test if function returns the b_link column""" + df_output = load_and_format("tests/" + scenario + ".csv") + + df_input = df_output.drop(columns=["b_link"]) + + df_input = calculate_imputation_link( + df_input, + "period", + "group", + "b_matched_pair", + "question", + "b_predictive_question", + ) + + assert_frame_equal(df_input, df_output, check_like=True) + + def test_exception(self, scenario): + + df = load_and_format("tests/" + scenario + ".csv") + + with pytest.raises(ValueError): + """ + Test if function is called with wrong arguments, in particular + with f_matched_pair and b_predictive_question or with + b_matched_pair and f_predictive_question. + """ + + df = calculate_imputation_link( + df, + "period", + "group", + "f_matched_pair", + "question", + "b_predictive_question", + ) + with pytest.raises(ValueError): + + df = calculate_imputation_link( + df, + "period", + "group", + "b_matched_pair", + "question", + "f_predictive_question", + )