Skip to content

Commit

Permalink
Preprocessing steps (#305)
Browse files Browse the repository at this point in the history
Separate preprocessing steps to those already done on the exposure (in the instrument class) and those that are required in the config.
  • Loading branch information
guynir42 authored Jun 14, 2024
1 parent 8ac1ce1 commit 9d7d977
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 127 deletions.
1 change: 0 additions & 1 deletion .github/workflows/run-pipeline-tests-1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,5 @@ jobs:
- name: run test
run: |
df -h
shopt -s nullglob
TEST_SUBFOLDER=$(ls tests/pipeline/test_{a..o}*.py) docker compose run runtests
2 changes: 1 addition & 1 deletion default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ catalog_gaiadr3:
pipeline: {}

preprocessing:
use_sky_subtraction: False
steps_required: [ 'overscan', 'linearity', 'flat', 'fringe' ]

extraction:
sources:
Expand Down
3 changes: 2 additions & 1 deletion models/decam.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def __init__(self, **kwargs):
# will apply kwargs to attributes, and register instrument in the INSTRUMENT_INSTANCE_CACHE
Instrument.__init__(self, **kwargs)

self.preprocessing_steps = [ 'overscan', 'linearity', 'flat', 'fringe' ]
self.preprocessing_steps_available = [ 'overscan', 'linearity', 'flat', 'fringe' ]
self.preprocessing_steps_done = []

@classmethod
def get_section_ids(cls):
Expand Down
25 changes: 13 additions & 12 deletions models/enums_and_bitflags.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,19 @@ def string_to_bitflag(value, dictionary):
return output


# bitflag for image preprocessing steps that have been done
image_preprocessing_dict = {
0: 'overscan',
1: 'zero',
2: 'dark',
3: 'linearity',
4: 'flat',
5: 'fringe',
6: 'illumination'
}
image_preprocessing_inverse = {EnumConverter.c(v):k for k, v in image_preprocessing_dict.items()}


# these are the ways an Image or Exposure are allowed to be bad
image_badness_dict = {
1: 'banding',
Expand Down Expand Up @@ -389,18 +402,6 @@ class BadnessConverter( EnumConverter ):
_dict_filtered = None
_dict_inverse = None

# bitflag for image preprocessing steps that have been done
image_preprocessing_dict = {
0: 'overscan',
1: 'zero',
2: 'dark',
3: 'linearity',
4: 'flat',
5: 'fringe',
6: 'illumination'
}
image_preprocessing_inverse = {EnumConverter.c(v):k for k, v in image_preprocessing_dict.items()}

# bitflag used in flag images
flag_image_bits = {
0: 'bad pixel', # Bad pixel flagged by the instrument
Expand Down
13 changes: 13 additions & 0 deletions models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
from models.enums_and_bitflags import (
ImageFormatConverter,
ImageTypeConverter,
string_to_bitflag,
bitflag_to_string,
image_preprocessing_dict,
image_preprocessing_inverse,
image_badness_inverse,
)

Expand Down Expand Up @@ -361,6 +365,15 @@ def mid_mjd(self):
doc='Bitflag specifying which preprocessing steps have been completed for the image.'
)

@property
def preprocessing_done(self):
"""Return a list of the names of preprocessing steps that have been completed for this image."""
return bitflag_to_string(self.preproc_bitflag, image_preprocessing_dict)

@preprocessing_done.setter
def preprocessing_done(self, value):
self.preproc_bitflag = string_to_bitflag(value, image_preprocessing_inverse)

astro_cal_done = sa.Column(
sa.BOOLEAN,
nullable=False,
Expand Down
32 changes: 20 additions & 12 deletions models/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,22 @@ def __init__(self, **kwargs):
self._dateobs_for_sections = getattr(self, '_dateobs_for_sections', None) # dateobs when sections were loaded
self._dateobs_range_days = getattr(self, '_dateobs_range_days', 1.0) # how many days from dateobs to reload

# List of the preprocessing steps to apply to images from this
# instrument, in order overscan must always be first. The
# values here (in the Instrument class) are all possible values.
# Subclasses should redefine this with the subset that they
# actually need. If a subclass has to add a new preprocessing
# step, then it should add that step to this list, and (if it's
# a step that includes a calibraiton image or datafile) to the
# CalibratorTypeConverter dict in enums_and_bitflags.py
self.preprocessing_steps = [ 'overscan', 'zero','dark', 'linearity', 'flat', 'fringe', 'illumination' ]
# List of the preprocessing steps that can be applied to exposures from this
# instrument, in order. 'overscan' must always be first.
# All preprocessing steps that are available for an instrument are listed under preprocessing_steps_available.
# Use image_preprocessing_dict, defined in the enums_and_bitflags file to see all possible values.
# Subclasses of Instrument should redefine this with the subset that they
# actually need to apply. So, if an instrument has exposures
# that already have overscan removed, that instrument should remove 'overscan' from this list.
# If a subclass has to add a new preprocessing step,
# then it should add that step to enum_and_bitflags.image_preprocessing_dict,
# and (if it's a step that includes a calibraiton image or datafile)
# to the CalibratorTypeConverter dict in enums_and_bitflags.
self.preprocessing_steps_available = ['overscan', 'zero', 'dark', 'linearity', 'flat', 'fringe', 'illumination']
# a list of preprocessing steps that are pre-applied to the exposure data
self.preprocessing_steps_done = []
self.preprocessing_step_skip_by_filter = {} # e.g., {'g': ['fringe', 'illumination']} will skip those for g

# nofile_steps are ones that don't have an associated file
self.preprocessing_nofile_steps = [ 'overscan' ]

Expand Down Expand Up @@ -1457,7 +1464,7 @@ def preprocessing_calibrator_files( self, calibset, flattype, section, filter, m
construction).
"""

section = str(section)
SCLogger.debug( f'Looking for calibrators for {calibset} {section}' )

if ( calibset == 'externally_supplied' ) != ( flattype == 'externally_supplied' ):
Expand All @@ -1475,7 +1482,7 @@ def preprocessing_calibrator_files( self, calibset, flattype, section, filter, m
expdatetime = pytz.utc.localize( astropy.time.Time( mjd, format='mjd' ).datetime )

with SmartSession(session) as session:
for calibtype in self.preprocessing_steps:
for calibtype in self.preprocessing_steps_available:
if calibtype in self.preprocessing_nofile_steps:
continue

Expand Down Expand Up @@ -1895,7 +1902,8 @@ def __init__(self, **kwargs):
Instrument.__init__(self, **kwargs)

# DemoInstrument doesn't know how to preprocess
self.preprocessing_steps = []
self.preprocessing_steps_available = []
self.preprocessing_steps_done = ['overscan', 'linearity', 'flat', 'fringe']

@classmethod
def get_section_ids(cls):
Expand Down
4 changes: 3 additions & 1 deletion models/ptf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def __init__(self, **kwargs):
# will apply kwargs to attributes, and register instrument in the INSTRUMENT_INSTANCE_CACHE
Instrument.__init__(self, **kwargs)

self.preprocessing_steps = []
# we are using preprocessed data as the exposures, so everything is already done
self.preprocessing_steps_available = []
self.preprocessing_steps_done = ['overscan', 'linearity', 'flat', 'fringe']

@classmethod
def get_section_ids(cls):
Expand Down
2 changes: 1 addition & 1 deletion pipeline/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def __setattr__(self, key, value):
and not isinstance(value, self.__typecheck__[real_key])
):
raise TypeError(
f'Parameter "{key}" must be of type {self.__typecheck__[real_key]}'
f'Parameter "{key}" must be of type {self.__typecheck__[real_key]}, got {type(value)} instead. '
)
super().__setattr__(real_key, value)

Expand Down
119 changes: 43 additions & 76 deletions pipeline/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ class ParsPreprocessor(Parameters):
def __init__(self, **kwargs):
super().__init__()

self.use_sky_subtraction = self.add_par('use_sky_subtraction', False, bool, 'Apply sky subtraction. ',
critical=True)
self.add_par( 'steps', None, ( list, None ), "Steps to do; don't specify, or pass None, to do all." )
self.add_par( 'calibset', None, ( str, None ),
( "One of the CalibratorSetConverter enum; "
"the calibrator set to use. Defaults to the instrument default" ),
critical = True )
self.add_par( 'steps_required', [], list, "Steps that need to be done to each exposure" )

self.add_par( 'calibset', 'externally_supplied', str,
"The calibrator set to use. Choose one of the CalibratorSetConverter enum. ",
critical=True )
self.add_alias( 'calibrator_set', 'calibset' )
self.add_par( 'flattype', None, ( str, None ),
( "One of the FlatTypeConverter enum; defaults to the instrument default" ),
critical = True )

self.add_par( 'flattype', 'externally_supplied', str,
"One of the FlatTypeConverter enum. ",
critical=True )

self._enforce_no_new_attrs = True

Expand Down Expand Up @@ -75,10 +74,6 @@ def __init__(self, **kwargs):
# the object did any work or just loaded from DB or datastore
self.has_recalculated = False

# TODO : remove this if/when we actually put sky subtraction in run()
if self.pars.use_sky_subtraction:
raise NotImplementedError( "Sky subtraction in preprocessing isn't implemented." )

def run( self, *args, **kwargs ):
"""Run preprocessing for a given exposure and section_identifier.
Expand All @@ -90,9 +85,6 @@ def run( self, *args, **kwargs ):
- Exposure, section_identifier
Passing just an image won't work.
kwargs can also include things that override the preprocessing
behavior. (TODO: document this)
Returns
-------
DataStore
Expand All @@ -105,9 +97,6 @@ def run( self, *args, **kwargs ):
except Exception as e:
return DataStore.catch_failure_to_parse(e, *args)

# This is here just for testing purposes
self._ds = ds # TODO: is there a reason not to just use the output datastore?

try: # catch any exceptions and save them in the datastore
t_start = time.perf_counter()
if parse_bool(os.getenv('SEECHANGE_TRACEMALLOC')):
Expand All @@ -124,41 +113,21 @@ def run( self, *args, **kwargs ):
if ( self.instrument is None ) or ( self.instrument.name != ds.exposure.instrument ):
self.instrument = ds.exposure.instrument_object

# The only reason these are saved in self, rather than being
# local variables, is so that tests can probe them
self._calibset = None
self._flattype = None
self._stepstodo = None

if 'calibset' in kwargs:
self._calibset = kwargs['calibset']
elif 'calibratorset' in kwargs:
self._calibset = kwargs['calibrator_set']
elif self.pars.calibset is not None:
self._calibset = self.pars.calibset
else:
self._calibset = cfg.value( f'{self.instrument.name}.calibratorset',
default=cfg.value( 'instrument_default.calibratorset' ) )

if 'flattype' in kwargs:
self._flattype = kwargs['flattype']
elif self.pars.flattype is not None:
self._flattype = self.pars.flattype
else:
self._flattype = cfg.value( f'{self.instrument.name}.flattype',
default=cfg.value( 'instrument_default.flattype' ) )

if 'steps' in kwargs:
self._stepstodo = [ s for s in self.instrument.preprocessing_steps if s in kwargs['steps'] ]
elif self.pars.steps is not None:
self._stepstodo = [ s for s in self.instrument.preprocessing_steps if s in self.pars.steps ]
else:
self._stepstodo = self.instrument.preprocessing_steps

# check that all required steps can be done (or have been done) by the instrument:
known_steps = self.instrument.preprocessing_steps_available
known_steps += self.instrument.preprocessing_steps_done
known_steps = set(known_steps)
needed_steps = set(self.pars.steps_required)
if not needed_steps.issubset(known_steps):
raise ValueError(
f'Missing some preprocessing steps {needed_steps - known_steps} '
f'for instrument {self.instrument.name}'
)

# Get the calibrator files
SCLogger.debug("preprocessing: getting calibrator files")
preprocparam = self.instrument.preprocessing_calibrator_files( self._calibset,
self._flattype,
preprocparam = self.instrument.preprocessing_calibrator_files( self.pars.calibset,
self.pars.flattype,
ds.section_id,
ds.exposure.filter_short,
ds.exposure.mjd,
Expand All @@ -167,18 +136,6 @@ def run( self, *args, **kwargs ):
SCLogger.debug("preprocessing: got calibrator files")

# get the provenance for this step, using the current parameters:
# Provenance includes not just self.pars.get_critical_pars(),
# but also the steps that were performed. Reason: we may well
# load non-flatfielded images in the database for purposes of
# collecting images used for later building flats. We will then
# flatfield those images. The two images in the database must have
# different provenances.
# We also include any overrides to calibrator files, as that indicates
# that something individual happened here that's different from
# normal processing of the image.
# Fix this as part of issue #147
# provdict = dict( self.pars.get_critical_pars() )
# provdict['preprocessing_steps' ] = self._stepstodo
prov = ds.get_provenance('preprocessing', self.pars.get_critical_pars(), session=session)

# check if the image already exists in memory or in the database:
Expand All @@ -194,25 +151,35 @@ def run( self, *args, **kwargs ):
if image.preproc_bitflag is None:
image.preproc_bitflag = 0

required_bitflag = 0
for step in self._stepstodo:
required_bitflag |= string_to_bitflag( step, image_preprocessing_inverse )

if image._data is None: # in case we are skipping all preprocessing steps
image.data = image.raw_data

if image.preproc_bitflag != required_bitflag:
needed_steps -= set(self.instrument.preprocessing_steps_done)
filter_skips = self.instrument.preprocessing_step_skip_by_filter.get(ds.exposure.filter_short, [])
if not isinstance(filter_skips, list):
raise ValueError(f'Filter skips parameter for {ds.exposure.filter_short} must be a list')
filter_skips = set(filter_skips)
needed_steps -= filter_skips

if image._data is None: # in case we skip all preprocessing steps
image.data = image.raw_data

# the image keeps track of the steps already done to it in image.preproc_bitflag,
# which is translated into a list of keywords when calling image.preprocessing_done
# this includes the things that already were applied in the exposure
# (i.e., the instrument's preprocessing_steps_done) but does not
# include the things that were skipped for this filter
# (i.e., the instrument's preprocessing_step_skip_by_filter)
already_done = set(image.preprocessing_done.split(', ') if image.preprocessing_done else [])
if not needed_steps.issubset(already_done): # still things to do here
self.has_recalculated = True
# Overscan is always first (as it reshapes the image)
if 'overscan' in self._stepstodo:
if 'overscan' in needed_steps:
SCLogger.debug('preprocessing: overscan and trim')
image.data = self.instrument.overscan_and_trim( image )
# Update the header ra/dec calculations now that we know the real width/height
image.set_corners_from_header_wcs(setradec=True)
image.preproc_bitflag |= string_to_bitflag( 'overscan', image_preprocessing_inverse )

# Apply steps in the order expected by the instrument
for step in self._stepstodo:
for step in needed_steps:
if step == 'overscan':
continue
SCLogger.debug(f"preprocessing: {step}")
Expand Down Expand Up @@ -274,7 +241,7 @@ def run( self, *args, **kwargs ):

image.preproc_bitflag |= string_to_bitflag( step, image_preprocessing_inverse )

# Get the Instrument standard bad pixel mask for this image
# Get the Instrument standard bad pixel mask for this image
if image._flags is None or image._weight is None:
image._flags = self.instrument.get_standard_flags_image( ds.section_id )

Expand Down
1 change: 1 addition & 0 deletions tests/improc/test_tools.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from astropy.io import fits
from improc.tools import strip_wcs_keywords


def test_strip_wcs_keywords():
hdr = fits.Header()

Expand Down
2 changes: 0 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ def test_parameters( test_config ):

# Verify that we can override from the yaml config file
pipeline = Pipeline()
assert not pipeline.preprocessor.pars['use_sky_subtraction']
assert pipeline.astrometor.pars['cross_match_catalog'] == 'gaia_dr3'
assert pipeline.astrometor.pars['catalog'] == 'gaia_dr3'
assert pipeline.subtractor.pars['method'] == 'zogy'
Expand Down Expand Up @@ -537,7 +536,6 @@ def test_provenance_tree(pipeline_for_tests, decam_exposure, decam_datastore, de
def test_inject_warnings_errors(decam_datastore, decam_reference, pipeline_for_tests):
from pipeline.top_level import PROCESS_OBJECTS
p = pipeline_for_tests

obj_to_process_name = {
'preprocessor': 'preprocessing',
'extractor': 'detection',
Expand Down
Loading

0 comments on commit 9d7d977

Please sign in to comment.