Skip to content

Commit

Permalink
Fix diaCalculation unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
isullivan committed Jun 12, 2024
1 parent f475ef3 commit f5a0985
Showing 1 changed file with 46 additions and 14 deletions.
60 changes: 46 additions & 14 deletions tests/test_diaCalculationPlugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,34 @@ def run_multi_plugin(diaObjectCat, diaSourceCat, band, plugin):
band=band)


def make_diaObject_table(objId, plugin, default_value=None, band=None):
"""Create a minimal diaObject table with columns required for the plugin
Parameters
----------
objId : `int`
The diaObjectId
plugin : `lsst.ap.association.DiaCalculationPlugin`
The plugin that will be run.
default_value : None, optional
Value to set new columns to.
band : `str`, optional
Band designation to append to the plugin columns.
Returns
-------
diaObjects : `pandas.DataFrame`
Output catalog with the required columns for the plugin.
"""
diaObjects = {"diaObjectId": [objId]}
for col in plugin.outputCols:
if band is not None:
diaObjects[f"{band}_{col}"] = default_value
else:
diaObjects[col] = default_value
return pd.DataFrame(diaObjects)


class TestMeanPosition(unittest.TestCase):

def testCalculate(self):
Expand Down Expand Up @@ -245,14 +273,14 @@ def testCalculate(self):
for n_sources in [1, 8, 10]:
# Test expected number of sources per object.
objId = 0
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["g"],
"diaSourceId": np.arange(n_sources, dtype=int)})
plug = NumDiaSourcesDiaPlugin(NumDiaSourcesDiaPluginConfig(),
"ap_nDiaSources",
None)
diaObjects = make_diaObject_table(objId, plug, default_value=int(-1))
run_multi_plugin(diaObjects, diaSources, "g", plug)

self.assertEqual(n_sources, diaObjects.at[objId, "nDiaSources"])
Expand All @@ -267,7 +295,6 @@ def testCalculate(self):
n_sources = 10

# Test expected flags, no flags set.
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["g"],
Expand All @@ -276,38 +303,41 @@ def testCalculate(self):
plug = SimpleSourceFlagDiaPlugin(SimpleSourceFlagDiaPluginConfig(),
"ap_diaObjectFlag",
None)

diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64)
run_multi_plugin(diaObjects, diaSources, "g", plug)
self.assertEqual(diaObjects.at[objId, "flags"], 0)

# Test expected flags, all flags set.
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["g"],
"diaSourceId": np.arange(n_sources, dtype=int),
"flags": np.ones(n_sources, dtype=np.uint64)})
diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64)
run_multi_plugin(diaObjects, diaSources, "g", plug)
self.assertEqual(diaObjects.at[objId, "flags"], 1)

# Test expected flags, random flags.
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["g"],
"diaSourceId": np.arange(n_sources, dtype=int),
"flags": np.random.randint(0, 2 ** 16, size=n_sources)})

diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64)
run_multi_plugin(diaObjects, diaSources, "g", plug)
self.assertEqual(diaObjects.at[objId, "flags"], 1)

# Test expected flags, one flag set.
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
flag_array = np.zeros(n_sources, dtype=np.uint64)
flag_array[4] = 256
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["g"],
"diaSourceId": np.arange(n_sources, dtype=int),
"flags": flag_array})
diaObjects = make_diaObject_table(objId, plug, default_value=np.uint64)
run_multi_plugin(diaObjects, diaSources, "g", plug)
self.assertEqual(diaObjects.at[objId, "flags"], 1)

Expand Down Expand Up @@ -417,7 +447,6 @@ def testCalculate(self):

# Test expected sigma scatter of fluxes.
fluxes = np.linspace(-1, 1, n_sources)
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["u"],
Expand All @@ -428,30 +457,33 @@ def testCalculate(self):
plug = SigmaDiaPsfFlux(SigmaDiaPsfFluxConfig(),
"ap_sigmaFlux",
None)
diaObjects = make_diaObject_table(objId, plug, band='u')
run_multi_plugin(diaObjects, diaSources, "u", plug)
self.assertAlmostEqual(diaObjects.at[objId, "u_psfFluxSigma"],
np.nanstd(fluxes, ddof=1))

# test one input, returns nan.
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": 1 * [objId],
"band": 1 * ["g"],
"diaSourceId": [0],
"psfFlux": [fluxes[0]],
"psfFluxErr": [1.]})

diaObjects = make_diaObject_table(objId, plug, band='g')
run_multi_plugin(diaObjects, diaSources, "g", plug)
self.assertTrue(np.isnan(diaObjects.at[objId, "g_psfFluxSigma"]))

# Test expected sigma scatter of fluxes with a nan value.
fluxes[4] = np.nan
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["r"],
"diaSourceId": np.arange(n_sources, dtype=int),
"psfFlux": fluxes,
"psfFluxErr": np.ones(n_sources)})

diaObjects = make_diaObject_table(objId, plug, band='r')
run_multi_plugin(diaObjects, diaSources, "r", plug)
self.assertAlmostEqual(diaObjects.at[objId, "r_psfFluxSigma"],
np.nanstd(fluxes, ddof=1))
Expand Down Expand Up @@ -637,7 +669,6 @@ def testCalculate(self):
# Test max slope value.
fluxes = np.linspace(-1, 1, n_sources)
times = np.concatenate([np.linspace(0, 1, n_sources)[:-1], [1 - 1/90]])
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["u"],
Expand All @@ -649,32 +680,33 @@ def testCalculate(self):
plug = MaxSlopeDiaPsfFlux(MaxSlopeDiaPsfFluxConfig(),
"ap_maxSlopeFlux",
None)
diaObjects = make_diaObject_table(objId, plug, band='u')
run_multi_plugin(diaObjects, diaSources, "u", plug)
self.assertAlmostEqual(diaObjects.at[objId, "u_psfFluxMaxSlope"], 2 + 2/9)

# Test max slope value returns nan on 1 input.
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": 1 * [objId],
"band": 1 * ["g"],
"diaSourceId": np.arange(1, dtype=int),
"psfFlux": fluxes[0],
"psfFluxErr": np.ones(1),
"midpointMjdTai": times[0]})
diaObjects = make_diaObject_table(objId, plug, band='g')
run_multi_plugin(diaObjects, diaSources, "g", plug)
self.assertTrue(np.isnan(diaObjects.at[objId, "g_psfFluxMaxSlope"]))

# Test max slope value inputing nan values.
fluxes[4] = np.nan
times[7] = np.nan
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["r"],
"diaSourceId": np.arange(n_sources, dtype=int),
"psfFlux": fluxes,
"psfFluxErr": np.ones(n_sources),
"midpointMjdTai": times})
diaObjects = make_diaObject_table(objId, plug, band='r')
run_multi_plugin(diaObjects, diaSources, "r", plug)
self.assertAlmostEqual(diaObjects.at[objId, "r_psfFluxMaxSlope"], 2 + 2 / 9)

Expand Down Expand Up @@ -882,7 +914,6 @@ def testCalculate(self):

# Test test scatter on scienceFlux.
fluxes = np.linspace(-1, 1, n_sources)
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["u"],
Expand All @@ -893,30 +924,31 @@ def testCalculate(self):
plug = SigmaDiaTotFlux(SigmaDiaTotFluxConfig(),
"ap_sigmaTotFlux",
None)
diaObjects = make_diaObject_table(objId, plug, band='u')
run_multi_plugin(diaObjects, diaSources, "u", plug)
self.assertAlmostEqual(diaObjects.at[objId, "u_scienceFluxSigma"],
np.nanstd(fluxes, ddof=1))

# Test test scatter on scienceFlux returns nan on 1 input.
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": 1 * [objId],
"band": 1 * ["g"],
"diaSourceId": np.arange(1, dtype=int),
"scienceFlux": fluxes[0],
"scienceFluxErr": np.ones(1)})
diaObjects = make_diaObject_table(objId, plug, band='g')
run_multi_plugin(diaObjects, diaSources, "g", plug)
self.assertTrue(np.isnan(diaObjects.at[objId, "g_scienceFluxSigma"]))

# Test test scatter on scienceFlux takes input nans.
fluxes[4] = np.nan
diaObjects = pd.DataFrame({"diaObjectId": [objId]})
diaSources = pd.DataFrame(
data={"diaObjectId": n_sources * [objId],
"band": n_sources * ["r"],
"diaSourceId": np.arange(n_sources, dtype=int),
"scienceFlux": fluxes,
"scienceFluxErr": np.ones(n_sources)})
diaObjects = make_diaObject_table(objId, plug, band='r')
run_multi_plugin(diaObjects, diaSources, "r", plug)
self.assertAlmostEqual(diaObjects.at[objId, "r_scienceFluxSigma"],
np.nanstd(fluxes, ddof=1))
Expand Down

0 comments on commit f5a0985

Please sign in to comment.