From 072302292d21979086e1486844bb4c525951969b Mon Sep 17 00:00:00 2001 From: Ian Sullivan Date: Fri, 17 May 2024 16:45:29 -0700 Subject: [PATCH] Force diaCalculationPlugins to preserve dtype --- .../lsst/meas/base/diaCalculationPlugins.py | 101 +++++++++++++----- tests/test_diaCalculationPlugins.py | 60 ++++++++--- 2 files changed, 119 insertions(+), 42 deletions(-) diff --git a/python/lsst/meas/base/diaCalculationPlugins.py b/python/lsst/meas/base/diaCalculationPlugins.py index 921d631a..a7288ddd 100644 --- a/python/lsst/meas/base/diaCalculationPlugins.py +++ b/python/lsst/meas/base/diaCalculationPlugins.py @@ -218,7 +218,8 @@ def calculate(self, diaObjects, diaSources, **kwargs): **kwargs Any additional keyword arguments that may be passed to the plugin. """ - diaObjects.loc[:, "nDiaSources"] = diaSources.diaObjectId.count() + dtype = diaObjects["nDiaSources"].dtype + diaObjects.loc[:, "nDiaSources"] = diaSources.diaObjectId.count().astype(dtype) class SimpleSourceFlagDiaPluginConfig(DiaObjectCalculationPluginConfig): @@ -253,7 +254,8 @@ def calculate(self, diaObjects, diaSources, **kwargs): **kwargs Any additional keyword arguments that may be passed to the plugin. """ - diaObjects.loc[:, "flags"] = diaSources.flags.any().astype(np.uint64) + dtype = diaObjects["flags"].dtype + diaObjects.loc[:, "flags"] = diaSources.flags.any().astype(dtype) class WeightedMeanDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -329,9 +331,9 @@ def _weightedMean(df): errName: fluxMeanErr, nDataName: nFluxData}, dtype="object") + df = filterDiaSources.apply(_weightedMean).astype(diaObjects.dtypes[[meanName, errName, nDataName]]) - diaObjects.loc[:, [meanName, errName, nDataName]] = \ - filterDiaSources.apply(_weightedMean) + diaObjects.loc[:, [meanName, errName, nDataName]] = df class PercentileDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -392,20 +394,25 @@ def calculate(self, Any additional keyword arguments that may be passed to the plugin. """ pTileNames = [] + dtype = None for tilePercent in self.config.percentiles: pTileName = "{}_psfFluxPercentile{:02d}".format(band, tilePercent) pTileNames.append(pTileName) if pTileName not in diaObjects.columns: diaObjects[pTileName] = np.nan + elif dtype is None: + dtype = diaObjects[pTileName].dtype def _fluxPercentiles(df): pTiles = np.nanpercentile(df["psfFlux"], self.config.percentiles) return pd.Series( dict((tileName, pTile) for tileName, pTile in zip(pTileNames, pTiles))) - - diaObjects.loc[:, pTileNames] = filterDiaSources.apply(_fluxPercentiles) + if dtype is None: + diaObjects.loc[:, pTileNames] = filterDiaSources.apply(_fluxPercentiles) + else: + diaObjects.loc[:, pTileNames] = filterDiaSources.apply(_fluxPercentiles).astype(dtype) class SigmaDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -452,8 +459,12 @@ def calculate(self, """ # Set "delta degrees of freedom (ddf)" to 1 to calculate the unbiased # estimator of scatter (i.e. 'N - 1' instead of 'N'). - diaObjects.loc[:, "{}_psfFluxSigma".format(band)] = \ - filterDiaSources.psfFlux.std() + column = "{}_psfFluxSigma".format(band) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = filterDiaSources.psfFlux.std().astype(dtype) + else: + diaObjects.loc[:, column] = filterDiaSources.psfFlux.std() class Chi2DiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -503,14 +514,18 @@ def calculate(self, Any additional keyword arguments that may be passed to the plugin. """ meanName = "{}_psfFluxMean".format(band) + column = "{}_psfFluxChi2".format(band) def _chi2(df): delta = (df["psfFlux"] - diaObjects.at[df.diaObjectId.iat[0], meanName]) return np.nansum((delta / df["psfFluxErr"]) ** 2) - diaObjects.loc[:, "{}_psfFluxChi2".format(band)] = \ - filterDiaSources.apply(_chi2) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = filterDiaSources.apply(_chi2).astype(dtype) + else: + diaObjects.loc[:, column] = filterDiaSources.apply(_chi2) class MadDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -558,9 +573,16 @@ def calculate(self, **kwargs Any additional keyword arguments that may be passed to the plugin. """ - diaObjects.loc[:, "{}_psfFluxMAD".format(band)] = \ - filterDiaSources.psfFlux.apply(median_absolute_deviation, - ignore_nan=True) + column = "{}_psfFluxMAD".format(band) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = \ + filterDiaSources.psfFlux.apply(median_absolute_deviation, + ignore_nan=True).astype(dtype) + else: + diaObjects.loc[:, column] = \ + filterDiaSources.psfFlux.apply(median_absolute_deviation, + ignore_nan=True) class SkewDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -607,8 +629,12 @@ def calculate(self, **kwargs Any additional keyword arguments that may be passed to the plugin. """ - diaObjects.loc[:, "{}_psfFluxSkew".format(band)] = \ - filterDiaSources.psfFlux.skew() + column = "{}_psfFluxSkew".format(band) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = filterDiaSources.psfFlux.skew().astype(dtype) + else: + diaObjects.loc[:, column] = filterDiaSources.psfFlux.skew() class MinMaxDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -662,8 +688,9 @@ def calculate(self, if maxName not in diaObjects.columns: diaObjects[maxName] = np.nan - diaObjects.loc[:, minName] = filterDiaSources.psfFlux.min() - diaObjects.loc[:, maxName] = filterDiaSources.psfFlux.max() + dtype = diaObjects[minName].dtype + diaObjects.loc[:, minName] = filterDiaSources.psfFlux.min().astype(dtype) + diaObjects.loc[:, maxName] = filterDiaSources.psfFlux.max().astype(dtype) class MaxSlopeDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -722,8 +749,12 @@ def _maxSlope(df): fluxes = tmpDf["psfFlux"].to_numpy()[timeArgs] return (np.diff(fluxes) / np.diff(times)).max() - diaObjects.loc[:, "{}_psfFluxMaxSlope".format(band)] = \ - filterDiaSources.apply(_maxSlope) + column = "{}_psfFluxMaxSlope".format(band) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = filterDiaSources.apply(_maxSlope).astype(dtype) + else: + diaObjects.loc[:, column] = filterDiaSources.apply(_maxSlope) class ErrMeanDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -770,8 +801,12 @@ def calculate(self, **kwargs Any additional keyword arguments that may be passed to the plugin. """ - diaObjects.loc[:, "{}_psfFluxErrMean".format(band)] = \ - filterDiaSources.psfFluxErr.mean() + column = "{}_psfFluxErrMean".format(band) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = filterDiaSources.psfFluxErr.mean().astype(dtype) + else: + diaObjects.loc[:, column] = filterDiaSources.psfFluxErr.mean() class LinearFitDiaPsfFluxConfig(DiaObjectCalculationPluginConfig): @@ -825,6 +860,7 @@ def calculate(self, bName = "{}_psfFluxLinearIntercept".format(band) if bName not in diaObjects.columns: diaObjects[bName] = np.nan + dtype = diaObjects[mName].dtype def _linearFit(df): tmpDf = df[~np.logical_or( @@ -838,7 +874,7 @@ def _linearFit(df): times = tmpDf["midpointMjdTai"].to_numpy() A = np.array([times / errors, 1 / errors]).transpose() m, b = lsq_linear(A, fluxes / errors).x - return pd.Series({mName: m, bName: b}) + return pd.Series({mName: m, bName: b}, dtype=dtype) diaObjects.loc[:, [mName, bName]] = filterDiaSources.apply(_linearFit) @@ -903,8 +939,12 @@ def _stetsonJ(df): errors, diaObjects.at[tmpDf.diaObjectId.iat[0], meanName]) - diaObjects.loc[:, "{}_psfFluxStetsonJ".format(band)] = \ - filterDiaSources.apply(_stetsonJ) + column = "{}_psfFluxStetsonJ".format(band) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = filterDiaSources.apply(_stetsonJ).astype(dtype) + else: + diaObjects.loc[:, column] = filterDiaSources.apply(_stetsonJ) def _stetson_J(self, fluxes, errors, mean=None): """Compute the single band stetsonJ statistic. @@ -1064,8 +1104,8 @@ def _meanFlux(df): return pd.Series({totMeanName: fluxMean, totErrName: fluxMeanErr}) - diaObjects.loc[:, [totMeanName, totErrName]] = \ - filterDiaSources.apply(_meanFlux) + df = filterDiaSources.apply(_meanFlux).astype(diaObjects.dtypes[[totMeanName, totErrName]]) + diaObjects.loc[:, [totMeanName, totErrName]] = df class SigmaDiaTotFluxConfig(DiaObjectCalculationPluginConfig): @@ -1113,5 +1153,10 @@ def calculate(self, """ # Set "delta degrees of freedom (ddf)" to 1 to calculate the unbiased # estimator of scatter (i.e. 'N - 1' instead of 'N'). - diaObjects.loc[:, "{}_scienceFluxSigma".format(band)] = \ - filterDiaSources.scienceFlux.std() + column = "{}_scienceFluxSigma".format(band) + if column in diaObjects: + dtype = diaObjects[column].dtype + diaObjects.loc[:, column] = filterDiaSources.scienceFlux.std().astype(dtype) + else: + + diaObjects.loc[:, column] = filterDiaSources.scienceFlux.std() diff --git a/tests/test_diaCalculationPlugins.py b/tests/test_diaCalculationPlugins.py index 68740d07..e435df9d 100644 --- a/tests/test_diaCalculationPlugins.py +++ b/tests/test_diaCalculationPlugins.py @@ -119,6 +119,34 @@ def run_multi_plugin(diaObjectCat, diaSourceCat, band, plugin): band=band) +def make_diaObject_table(objId, plugin, default_value=None, band=None): + """Create a minimal diaObject table with columns required for the plugin + + Parameters + ---------- + objId : `int` + The diaObjectId + plugin : `lsst.ap.association.DiaCalculationPlugin` + The plugin that will be run. + default_value : None, optional + Value to set new columns to. + band : `str`, optional + Band designation to append to the plugin columns. + + Returns + ------- + diaObjects : `pandas.DataFrame` + Output catalog with the required columns for the plugin. + """ + diaObjects = {"diaObjectId": [objId]} + for col in plugin.outputCols: + if band is not None: + diaObjects[f"{band}_{col}"] = default_value + else: + diaObjects[col] = default_value + return pd.DataFrame(diaObjects) + + class TestMeanPosition(unittest.TestCase): def testCalculate(self): @@ -245,7 +273,6 @@ def testCalculate(self): for n_sources in [1, 8, 10]: # Test expected number of sources per object. objId = 0 - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["g"], @@ -253,6 +280,7 @@ def testCalculate(self): plug = NumDiaSourcesDiaPlugin(NumDiaSourcesDiaPluginConfig(), "ap_nDiaSources", None) + diaObjects = make_diaObject_table(objId, plug, default_value=int(-1)) run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertEqual(n_sources, diaObjects.at[objId, "nDiaSources"]) @@ -267,7 +295,6 @@ def testCalculate(self): n_sources = 10 # Test expected flags, no flags set. - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["g"], @@ -276,31 +303,33 @@ def testCalculate(self): plug = SimpleSourceFlagDiaPlugin(SimpleSourceFlagDiaPluginConfig(), "ap_diaObjectFlag", None) + + diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64) run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertEqual(diaObjects.at[objId, "flags"], 0) # Test expected flags, all flags set. - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["g"], "diaSourceId": np.arange(n_sources, dtype=int), "flags": np.ones(n_sources, dtype=np.uint64)}) + diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64) run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertEqual(diaObjects.at[objId, "flags"], 1) # Test expected flags, random flags. - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["g"], "diaSourceId": np.arange(n_sources, dtype=int), "flags": np.random.randint(0, 2 ** 16, size=n_sources)}) + + diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64) run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertEqual(diaObjects.at[objId, "flags"], 1) # Test expected flags, one flag set. - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) flag_array = np.zeros(n_sources, dtype=np.uint64) flag_array[4] = 256 diaSources = pd.DataFrame( @@ -308,6 +337,7 @@ def testCalculate(self): "band": n_sources * ["g"], "diaSourceId": np.arange(n_sources, dtype=int), "flags": flag_array}) + diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64) run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertEqual(diaObjects.at[objId, "flags"], 1) @@ -417,7 +447,6 @@ def testCalculate(self): # Test expected sigma scatter of fluxes. fluxes = np.linspace(-1, 1, n_sources) - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["u"], @@ -428,30 +457,33 @@ def testCalculate(self): plug = SigmaDiaPsfFlux(SigmaDiaPsfFluxConfig(), "ap_sigmaFlux", None) + diaObjects = make_diaObject_table(objId, plug, band='u') run_multi_plugin(diaObjects, diaSources, "u", plug) self.assertAlmostEqual(diaObjects.at[objId, "u_psfFluxSigma"], np.nanstd(fluxes, ddof=1)) # test one input, returns nan. - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": 1 * [objId], "band": 1 * ["g"], "diaSourceId": [0], "psfFlux": [fluxes[0]], "psfFluxErr": [1.]}) + + diaObjects = make_diaObject_table(objId, plug, band='g') run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertTrue(np.isnan(diaObjects.at[objId, "g_psfFluxSigma"])) # Test expected sigma scatter of fluxes with a nan value. fluxes[4] = np.nan - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["r"], "diaSourceId": np.arange(n_sources, dtype=int), "psfFlux": fluxes, "psfFluxErr": np.ones(n_sources)}) + + diaObjects = make_diaObject_table(objId, plug, band='r') run_multi_plugin(diaObjects, diaSources, "r", plug) self.assertAlmostEqual(diaObjects.at[objId, "r_psfFluxSigma"], np.nanstd(fluxes, ddof=1)) @@ -637,7 +669,6 @@ def testCalculate(self): # Test max slope value. fluxes = np.linspace(-1, 1, n_sources) times = np.concatenate([np.linspace(0, 1, n_sources)[:-1], [1 - 1/90]]) - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["u"], @@ -649,11 +680,11 @@ def testCalculate(self): plug = MaxSlopeDiaPsfFlux(MaxSlopeDiaPsfFluxConfig(), "ap_maxSlopeFlux", None) + diaObjects = make_diaObject_table(objId, plug, band='u') run_multi_plugin(diaObjects, diaSources, "u", plug) self.assertAlmostEqual(diaObjects.at[objId, "u_psfFluxMaxSlope"], 2 + 2/9) # Test max slope value returns nan on 1 input. - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": 1 * [objId], "band": 1 * ["g"], @@ -661,13 +692,13 @@ def testCalculate(self): "psfFlux": fluxes[0], "psfFluxErr": np.ones(1), "midpointMjdTai": times[0]}) + diaObjects = make_diaObject_table(objId, plug, band='g') run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertTrue(np.isnan(diaObjects.at[objId, "g_psfFluxMaxSlope"])) # Test max slope value inputing nan values. fluxes[4] = np.nan times[7] = np.nan - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["r"], @@ -675,6 +706,7 @@ def testCalculate(self): "psfFlux": fluxes, "psfFluxErr": np.ones(n_sources), "midpointMjdTai": times}) + diaObjects = make_diaObject_table(objId, plug, band='r') run_multi_plugin(diaObjects, diaSources, "r", plug) self.assertAlmostEqual(diaObjects.at[objId, "r_psfFluxMaxSlope"], 2 + 2 / 9) @@ -882,7 +914,6 @@ def testCalculate(self): # Test test scatter on scienceFlux. fluxes = np.linspace(-1, 1, n_sources) - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["u"], @@ -893,30 +924,31 @@ def testCalculate(self): plug = SigmaDiaTotFlux(SigmaDiaTotFluxConfig(), "ap_sigmaTotFlux", None) + diaObjects = make_diaObject_table(objId, plug, band='u') run_multi_plugin(diaObjects, diaSources, "u", plug) self.assertAlmostEqual(diaObjects.at[objId, "u_scienceFluxSigma"], np.nanstd(fluxes, ddof=1)) # Test test scatter on scienceFlux returns nan on 1 input. - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": 1 * [objId], "band": 1 * ["g"], "diaSourceId": np.arange(1, dtype=int), "scienceFlux": fluxes[0], "scienceFluxErr": np.ones(1)}) + diaObjects = make_diaObject_table(objId, plug, band='g') run_multi_plugin(diaObjects, diaSources, "g", plug) self.assertTrue(np.isnan(diaObjects.at[objId, "g_scienceFluxSigma"])) # Test test scatter on scienceFlux takes input nans. fluxes[4] = np.nan - diaObjects = pd.DataFrame({"diaObjectId": [objId]}) diaSources = pd.DataFrame( data={"diaObjectId": n_sources * [objId], "band": n_sources * ["r"], "diaSourceId": np.arange(n_sources, dtype=int), "scienceFlux": fluxes, "scienceFluxErr": np.ones(n_sources)}) + diaObjects = make_diaObject_table(objId, plug, band='r') run_multi_plugin(diaObjects, diaSources, "r", plug) self.assertAlmostEqual(diaObjects.at[objId, "r_scienceFluxSigma"], np.nanstd(fluxes, ddof=1))