Skip to content

Commit

Permalink
Merge pull request #277 from lsst/tickets/DM-43856
Browse files Browse the repository at this point in the history
DM-43856: force diaCalculation plugins to preserve type
  • Loading branch information
isullivan authored Jun 13, 2024
2 parents b4b16ee + 0723022 commit 6f43f06
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 42 deletions.
101 changes: 73 additions & 28 deletions python/lsst/meas/base/diaCalculationPlugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ def calculate(self, diaObjects, diaSources, **kwargs):
**kwargs
Any additional keyword arguments that may be passed to the plugin.
"""
diaObjects.loc[:, "nDiaSources"] = diaSources.diaObjectId.count()
dtype = diaObjects["nDiaSources"].dtype
diaObjects.loc[:, "nDiaSources"] = diaSources.diaObjectId.count().astype(dtype)


class SimpleSourceFlagDiaPluginConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -253,7 +254,8 @@ def calculate(self, diaObjects, diaSources, **kwargs):
**kwargs
Any additional keyword arguments that may be passed to the plugin.
"""
diaObjects.loc[:, "flags"] = diaSources.flags.any().astype(np.uint64)
dtype = diaObjects["flags"].dtype
diaObjects.loc[:, "flags"] = diaSources.flags.any().astype(dtype)


class WeightedMeanDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -329,9 +331,9 @@ def _weightedMean(df):
errName: fluxMeanErr,
nDataName: nFluxData},
dtype="object")
df = filterDiaSources.apply(_weightedMean).astype(diaObjects.dtypes[[meanName, errName, nDataName]])

diaObjects.loc[:, [meanName, errName, nDataName]] = \
filterDiaSources.apply(_weightedMean)
diaObjects.loc[:, [meanName, errName, nDataName]] = df


class PercentileDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -392,20 +394,25 @@ def calculate(self,
Any additional keyword arguments that may be passed to the plugin.
"""
pTileNames = []
dtype = None
for tilePercent in self.config.percentiles:
pTileName = "{}_psfFluxPercentile{:02d}".format(band,
tilePercent)
pTileNames.append(pTileName)
if pTileName not in diaObjects.columns:
diaObjects[pTileName] = np.nan
elif dtype is None:
dtype = diaObjects[pTileName].dtype

def _fluxPercentiles(df):
pTiles = np.nanpercentile(df["psfFlux"], self.config.percentiles)
return pd.Series(
dict((tileName, pTile)
for tileName, pTile in zip(pTileNames, pTiles)))

diaObjects.loc[:, pTileNames] = filterDiaSources.apply(_fluxPercentiles)
if dtype is None:
diaObjects.loc[:, pTileNames] = filterDiaSources.apply(_fluxPercentiles)
else:
diaObjects.loc[:, pTileNames] = filterDiaSources.apply(_fluxPercentiles).astype(dtype)


class SigmaDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -452,8 +459,12 @@ def calculate(self,
"""
# Set "delta degrees of freedom (ddf)" to 1 to calculate the unbiased
# estimator of scatter (i.e. 'N - 1' instead of 'N').
diaObjects.loc[:, "{}_psfFluxSigma".format(band)] = \
filterDiaSources.psfFlux.std()
column = "{}_psfFluxSigma".format(band)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = filterDiaSources.psfFlux.std().astype(dtype)
else:
diaObjects.loc[:, column] = filterDiaSources.psfFlux.std()


class Chi2DiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -503,14 +514,18 @@ def calculate(self,
Any additional keyword arguments that may be passed to the plugin.
"""
meanName = "{}_psfFluxMean".format(band)
column = "{}_psfFluxChi2".format(band)

def _chi2(df):
delta = (df["psfFlux"]
- diaObjects.at[df.diaObjectId.iat[0], meanName])
return np.nansum((delta / df["psfFluxErr"]) ** 2)

diaObjects.loc[:, "{}_psfFluxChi2".format(band)] = \
filterDiaSources.apply(_chi2)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = filterDiaSources.apply(_chi2).astype(dtype)
else:
diaObjects.loc[:, column] = filterDiaSources.apply(_chi2)


class MadDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -558,9 +573,16 @@ def calculate(self,
**kwargs
Any additional keyword arguments that may be passed to the plugin.
"""
diaObjects.loc[:, "{}_psfFluxMAD".format(band)] = \
filterDiaSources.psfFlux.apply(median_absolute_deviation,
ignore_nan=True)
column = "{}_psfFluxMAD".format(band)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = \
filterDiaSources.psfFlux.apply(median_absolute_deviation,
ignore_nan=True).astype(dtype)
else:
diaObjects.loc[:, column] = \
filterDiaSources.psfFlux.apply(median_absolute_deviation,
ignore_nan=True)


class SkewDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -607,8 +629,12 @@ def calculate(self,
**kwargs
Any additional keyword arguments that may be passed to the plugin.
"""
diaObjects.loc[:, "{}_psfFluxSkew".format(band)] = \
filterDiaSources.psfFlux.skew()
column = "{}_psfFluxSkew".format(band)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = filterDiaSources.psfFlux.skew().astype(dtype)
else:
diaObjects.loc[:, column] = filterDiaSources.psfFlux.skew()


class MinMaxDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -662,8 +688,9 @@ def calculate(self,
if maxName not in diaObjects.columns:
diaObjects[maxName] = np.nan

diaObjects.loc[:, minName] = filterDiaSources.psfFlux.min()
diaObjects.loc[:, maxName] = filterDiaSources.psfFlux.max()
dtype = diaObjects[minName].dtype
diaObjects.loc[:, minName] = filterDiaSources.psfFlux.min().astype(dtype)
diaObjects.loc[:, maxName] = filterDiaSources.psfFlux.max().astype(dtype)


class MaxSlopeDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -722,8 +749,12 @@ def _maxSlope(df):
fluxes = tmpDf["psfFlux"].to_numpy()[timeArgs]
return (np.diff(fluxes) / np.diff(times)).max()

diaObjects.loc[:, "{}_psfFluxMaxSlope".format(band)] = \
filterDiaSources.apply(_maxSlope)
column = "{}_psfFluxMaxSlope".format(band)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = filterDiaSources.apply(_maxSlope).astype(dtype)
else:
diaObjects.loc[:, column] = filterDiaSources.apply(_maxSlope)


class ErrMeanDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -770,8 +801,12 @@ def calculate(self,
**kwargs
Any additional keyword arguments that may be passed to the plugin.
"""
diaObjects.loc[:, "{}_psfFluxErrMean".format(band)] = \
filterDiaSources.psfFluxErr.mean()
column = "{}_psfFluxErrMean".format(band)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = filterDiaSources.psfFluxErr.mean().astype(dtype)
else:
diaObjects.loc[:, column] = filterDiaSources.psfFluxErr.mean()


class LinearFitDiaPsfFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -825,6 +860,7 @@ def calculate(self,
bName = "{}_psfFluxLinearIntercept".format(band)
if bName not in diaObjects.columns:
diaObjects[bName] = np.nan
dtype = diaObjects[mName].dtype

def _linearFit(df):
tmpDf = df[~np.logical_or(
Expand All @@ -838,7 +874,7 @@ def _linearFit(df):
times = tmpDf["midpointMjdTai"].to_numpy()
A = np.array([times / errors, 1 / errors]).transpose()
m, b = lsq_linear(A, fluxes / errors).x
return pd.Series({mName: m, bName: b})
return pd.Series({mName: m, bName: b}, dtype=dtype)

diaObjects.loc[:, [mName, bName]] = filterDiaSources.apply(_linearFit)

Expand Down Expand Up @@ -903,8 +939,12 @@ def _stetsonJ(df):
errors,
diaObjects.at[tmpDf.diaObjectId.iat[0], meanName])

diaObjects.loc[:, "{}_psfFluxStetsonJ".format(band)] = \
filterDiaSources.apply(_stetsonJ)
column = "{}_psfFluxStetsonJ".format(band)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = filterDiaSources.apply(_stetsonJ).astype(dtype)
else:
diaObjects.loc[:, column] = filterDiaSources.apply(_stetsonJ)

def _stetson_J(self, fluxes, errors, mean=None):
"""Compute the single band stetsonJ statistic.
Expand Down Expand Up @@ -1064,8 +1104,8 @@ def _meanFlux(df):
return pd.Series({totMeanName: fluxMean,
totErrName: fluxMeanErr})

diaObjects.loc[:, [totMeanName, totErrName]] = \
filterDiaSources.apply(_meanFlux)
df = filterDiaSources.apply(_meanFlux).astype(diaObjects.dtypes[[totMeanName, totErrName]])
diaObjects.loc[:, [totMeanName, totErrName]] = df


class SigmaDiaTotFluxConfig(DiaObjectCalculationPluginConfig):
Expand Down Expand Up @@ -1113,5 +1153,10 @@ def calculate(self,
"""
# Set "delta degrees of freedom (ddf)" to 1 to calculate the unbiased
# estimator of scatter (i.e. 'N - 1' instead of 'N').
diaObjects.loc[:, "{}_scienceFluxSigma".format(band)] = \
filterDiaSources.scienceFlux.std()
column = "{}_scienceFluxSigma".format(band)
if column in diaObjects:
dtype = diaObjects[column].dtype
diaObjects.loc[:, column] = filterDiaSources.scienceFlux.std().astype(dtype)
else:

diaObjects.loc[:, column] = filterDiaSources.scienceFlux.std()
Loading

0 comments on commit 6f43f06

Please sign in to comment.