Skip to content

Commit

Permalink
Conductor password file & through_step (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
rknop authored Sep 4, 2024
1 parent a8d9fdb commit 02cb2fc
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 48 deletions.
11 changes: 6 additions & 5 deletions default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ storage:
# should Image object save the weights/flags/etc in a single file with the image data?
single_file: false
# The convention for building filenames for images
# Use any of the following: short_name, date, time, section_id, filter, ra, dec, prov_id
# Use any of the following: inst_name, date, time, section_id, filter, ra, dec, prov_id
# Can also use section_id_int if the section_id is always an integer
# Can also use ra_int and ra_frac to get the integer number before/after the decimal point
# (the same can be done for dec). Also use ra_int_h to get the number in hours.
Expand All @@ -42,11 +42,11 @@ storage:
#
# Set to null if there is no archive; otherwise, a dict
# Subfields:
# url: the URL of the archive server, or null if archive is on the filesystem
# archive_url: the URL of the archive server, or null if archive is on the filesystem
# verify_cert: boolean, should we verify the SSL cert of the archive server
# path_base: the base of the collection on the archive server (a string unique to this dataset)
# read_dir: the directory to read from if the archive is on the local filesystem, or null
# write_dir: the directory to write to if the archive is on the local filesystem, or null
# local_read_dir: the directory to read from if the archive is on the local filesystem, or null
# lcoal_write_dir: the directory to write to if the archive is on the local filesystem, or null

archive: null

Expand All @@ -58,7 +58,8 @@ archive: null
conductor:
conductor_url: unknown
username: unknown
password: unknown
password: null
password_file: null


# ======================================================================
Expand Down
12 changes: 9 additions & 3 deletions models/exposure.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,20 @@ def invent_filepath( self ):

# Much code redundancy with Image.invent_filepath; move to a mixin?

inst_name = project = ''

if self.provenance_id is None:
raise ValueError("Cannot invent filepath for exposure without provenance.")
if self.instrument_object is not None:
inst_name = self.instrument_object.get_short_instrument_name()
if self.project is not None:
project = self.project
prov_hash = self.provenance_id

t = Time(self.mjd, format='mjd', scale='utc').datetime
date = t.strftime('%Y%m%d')
time = t.strftime('%H%M%S')

short_name = self.instrument_object.get_short_instrument_name()
filter = self.instrument_object.get_short_filter_name(self.filter)

ra = self.ra
Expand All @@ -584,14 +589,15 @@ def invent_filepath( self ):
dec_int_pm = f'p{dec_int:02d}' if dec_int >= 0 else f'm{dec_int:02d}'
dec_frac = int(dec_frac)

default_convention = "{short_name}_{date}_{time}_{filter}_{prov_hash:.6s}"
default_convention = "{inst_name}_{date}_{time}_{filter}_{prov_hash:.6s}"
cfg = Config.get()
name_convention = cfg.value( 'storage.exposures.name_convention', default=None )
if name_convention is None:
name_convention = default_convention

filepath = name_convention.format(
short_name=short_name,
inst_name=inst_name,
project=project,
date=date,
time=time,
filter=filter,
Expand Down
5 changes: 4 additions & 1 deletion models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ def invent_filepath(self):
upstream images.
"""
prov_hash = inst_name = im_type = date = time = filter = ra = dec = dec_int_pm = ''
prov_hash = inst_name = im_type = date = time = filter = ra = dec = dec_int_pm = project = ''
section_id = section_id_int = ra_int = ra_int_h = ra_frac = dec_int = dec_frac = 0

if self.provenance_id is not None:
Expand All @@ -1077,6 +1077,8 @@ def invent_filepath(self):
inst_name = self.instrument_object.get_short_instrument_name()
if self.type is not None:
im_type = self.type
if self.project is not None:
project = self.project

if self.mjd is not None:
t = Time(self.mjd, format='mjd', scale='utc').datetime
Expand Down Expand Up @@ -1116,6 +1118,7 @@ def invent_filepath(self):
filepath = name_convention.format(
inst_name=inst_name,
im_type=im_type,
project=project,
date=date,
time=time,
filter=filter,
Expand Down
28 changes: 25 additions & 3 deletions pipeline/pipeline_exposure_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

class ExposureProcessor:
def __init__( self, instrument, identifier, params, numprocs, onlychips=None,
worker_log_level=logging.WARNING ):
through_step=None, worker_log_level=logging.WARNING ):
"""A class that processes all images in a single exposure, potentially using multiprocessing.
This is used internally by ExposureLauncher; normally, you would not use it directly.
Expand All @@ -55,6 +55,9 @@ def __init__( self, instrument, identifier, params, numprocs, onlychips=None,
sensor sections returned by the instrument's get_section_ids()
class method.
through_step : str or None
Passed on to top_level.py::Pipeline
worker_log_level : log level, default logging.WARNING
The log level for the worker processes. Here so that you can
have a different log level for the overall control process
Expand All @@ -66,6 +69,7 @@ class method.
self.params = params
self.numprocs = numprocs
self.onlychips = onlychips
self.through_step = through_step
self.worker_log_level = worker_log_level

def cleanup( self ):
Expand Down Expand Up @@ -106,6 +110,8 @@ def processchip( self, chip ):
SCLogger.info( f"Processing chip {chip} in process {me.name} PID {me.pid}..." )
SCLogger.setLevel( self.worker_log_level )
pipeline = Pipeline()
if ( self.through_step is not None ) and ( self.through_step != 'exposure' ):
pipeline.pars.through_step = self.through_step
ds = pipeline.run( self.exposure, chip, save_intermediate_products=False )
ds.save_and_commit()
SCLogger.setLevel( origloglevel )
Expand All @@ -128,6 +134,10 @@ def collate( self, res ):
def __call__( self ):
"""Run all the pipelines for the chips in the exposure."""

if self.through_step == 'exposure':
SCLogger.info( f"Only running through exposure, not launching any image processes" )
return

chips = self.instrument.get_section_ids()
if self.onlychips is not None:
chips = [ c for c in chips if c in self.onlychips ]
Expand Down Expand Up @@ -165,7 +175,7 @@ class ExposureLauncher:
"""

def __init__( self, cluster_id, node_id, numprocs=None, verify=True, onlychips=None,
worker_log_level=logging.WARNING ):
through_step=None, worker_log_level=logging.WARNING ):
"""Make an ExposureLauncher.
Parameters
Expand Down Expand Up @@ -200,6 +210,11 @@ def __init__( self, cluster_id, node_id, numprocs=None, verify=True, onlychips=N
sensor sections returned by the instrument's get_section_ids()
class method.
through_step : str or None
Parameter passed on to top_level.py::Pipeline, unless it is "exposure"
in which case all we do is download the exposure and load it into the
database.
worker_log_level : log level, default logging.WARNING
The log level for the worker processes. Here so that you can
have a different log level for the overall control process
Expand All @@ -213,6 +228,7 @@ class method.
self.cluster_id = cluster_id
self.node_id = node_id
self.onlychips = onlychips
self.through_step = through_step
self.worker_log_level = worker_log_level
self.conductor = ConductorConnector( verify=verify )

Expand Down Expand Up @@ -291,6 +307,7 @@ def __call__( self, max_n_exposures=None, die_on_exception=False ):
knownexp.params,
self.numprocs,
onlychips=self.onlychips,
through_step=self.through_step,
worker_log_level=self.worker_log_level )
SCLogger.info( f'Downloading and loading exposure {knownexp.identifier}...' )
exposure_processor.download_and_load_exposure()
Expand Down Expand Up @@ -350,6 +367,10 @@ def main():
parser.add_argument( "-w", "--worker-log-level", default="warning", help="Log level for worker processes" )
parser.add_argument( "--chips", default=None, nargs="+",
help="Only do these sensor sections (for debugging purposese)" )
parser.add_argument( "-t", "--through-step", default=None,
help=( "Only run through this step; default=run everything. Step can be "
"exposure, preprocessing, backgrounding, extraction, wcs, zp, "
"subtraction, detection, cutting, measuring" ) )
args = parser.parse_args()

loglookup = { 'error': logging.ERROR,
Expand All @@ -364,7 +385,8 @@ def main():
worker_log_level = loglookup[ args.worker_log_level.lower() ]

elaunch = ExposureLauncher( args.cluster_id, args.node_id, numprocs=args.numprocs, onlychips=args.chips,
verify=not args.noverify, worker_log_level=worker_log_level )
verify=not args.noverify, through_step=args.through_step,
worker_log_level=worker_log_level )
elaunch.register_worker()
try:
elaunch()
Expand Down
93 changes: 61 additions & 32 deletions pipeline/top_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ def __init__(self, **kwargs):
critical=False
)

self.through_step = self.add_par(
'through_step',
None,
( None, str ),
"Stop after this step. None = run the whole pipeline. String values can be "
"any of preprocessing, backgrounding, extraction, wcs, zp, subtraction, detection, cutting, measuring",
critical=False
)

self._enforce_no_new_attrs = True # lock against new parameters

self.override(kwargs)
Expand Down Expand Up @@ -284,12 +293,23 @@ def run(self, *args, **kwargs):
raise RuntimeError( "You have a persistent session in Pipeline.run; don't do that." )

try:
stepstodo = [ 'preprocessing', 'backgrounding', 'extraction', 'wcs', 'zp', 'subtraction',
'detection', 'cutting', 'measuring' ]
if self.pars.through_step is not None:
if self.pars.through_step not in stepsttodo:
raise ValueError( f"Unknown through_step: \"{self.parse.through_step}\"" )
stepstodo = stepstodo[ :stepstodo.index(self.pars.through_step)+1 ]

if ds.image is not None:
SCLogger.info(f"Pipeline starting for image {ds.image.id} ({ds.image.filepath})")
SCLogger.info(f"Pipeline starting for image {ds.image.id} ({ds.image.filepath}), "
f"running through step {stepstodo[-1]}" )
elif ds.exposure is not None:
SCLogger.info(f"Pipeline starting for exposure {ds.exposure.id} ({ds.exposure}) section {ds.section_id}")
SCLogger.info(f"Pipeline starting for exposure {ds.exposure.id} "
f"({ds.exposure}) section {ds.section_id}, "
f"running through step {stepstodo[-1]}" )
else:
SCLogger.info(f"Pipeline starting with args {args}, kwargs {kwargs}")
SCLogger.info(f"Pipeline starting with args {args}, kwargs {kwargs}, "
f"running through step {stepstodo[-1]}" )

if env_as_bool('SEECHANGE_TRACEMALLOC'):
# ref: https://docs.python.org/3/library/tracemalloc.html#record-the-current-and-peak-size-of-all-traced-memory-blocks
Expand All @@ -300,30 +320,35 @@ def run(self, *args, **kwargs):
ds.warnings_list = w # appends warning to this list as it goes along
# run dark/flat preprocessing, cut out a specific section of the sensor

SCLogger.info(f"preprocessor")
ds = self.preprocessor.run(ds, session)
ds.update_report('preprocessing', session=None)
SCLogger.info(f"preprocessing complete: image id = {ds.image.id}, filepath={ds.image.filepath}")
if 'preprocessing' in stepstodo:
SCLogger.info(f"preprocessor")
ds = self.preprocessor.run(ds, session)
ds.update_report('preprocessing', session=None)
SCLogger.info(f"preprocessing complete: image id = {ds.image.id}, filepath={ds.image.filepath}")

# extract sources and make a SourceList and PSF from the image
SCLogger.info(f"extractor for image id {ds.image.id}")
ds = self.extractor.run(ds, session)
ds.update_report('extraction', session=None)
if 'extraction' in stepstodo:
SCLogger.info(f"extractor for image id {ds.image.id}")
ds = self.extractor.run(ds, session)
ds.update_report('extraction', session=None)

# find the background for this image
SCLogger.info(f"backgrounder for image id {ds.image.id}")
ds = self.backgrounder.run(ds, session)
ds.update_report('extraction', session=None)
if 'backgrounding' in stepstodo:
SCLogger.info(f"backgrounder for image id {ds.image.id}")
ds = self.backgrounder.run(ds, session)
ds.update_report('extraction', session=None)

# find astrometric solution, save WCS into Image object and FITS headers
SCLogger.info(f"astrometor for image id {ds.image.id}")
ds = self.astrometor.run(ds, session)
ds.update_report('extraction', session=None)
if 'wcs' in stepstodo:
SCLogger.info(f"astrometor for image id {ds.image.id}")
ds = self.astrometor.run(ds, session)
ds.update_report('extraction', session=None)

# cross-match against photometric catalogs and get zero point, save into Image object and FITS headers
SCLogger.info(f"photometor for image id {ds.image.id}")
ds = self.photometor.run(ds, session)
ds.update_report('extraction', session=None)
if 'zp' in stepstodo:
SCLogger.info(f"photometor for image id {ds.image.id}")
ds = self.photometor.run(ds, session)
ds.update_report('extraction', session=None)

if self.pars.save_before_subtraction:
t_start = time.perf_counter()
Expand All @@ -339,29 +364,33 @@ def run(self, *args, **kwargs):
ds.runtimes['save_intermediate'] = time.perf_counter() - t_start

# fetch reference images and subtract them, save subtracted Image objects to DB and disk
SCLogger.info(f"subtractor for image id {ds.image.id}")
ds = self.subtractor.run(ds, session)
ds.update_report('subtraction', session=None)
if 'subtraction' in stepstodo:
SCLogger.info(f"subtractor for image id {ds.image.id}")
ds = self.subtractor.run(ds, session)
ds.update_report('subtraction', session=None)

# find sources, generate a source list for detections
SCLogger.info(f"detector for image id {ds.image.id}")
ds = self.detector.run(ds, session)
ds.update_report('detection', session=None)
if 'detection' in stepstodo:
SCLogger.info(f"detector for image id {ds.image.id}")
ds = self.detector.run(ds, session)
ds.update_report('detection', session=None)

# make cutouts of all the sources in the "detections" source list
SCLogger.info(f"cutter for image id {ds.image.id}")
ds = self.cutter.run(ds, session)
ds.update_report('cutting', session=None)
if 'cutting' in stepstodo:
SCLogger.info(f"cutter for image id {ds.image.id}")
ds = self.cutter.run(ds, session)
ds.update_report('cutting', session=None)

# extract photometry and analytical cuts
SCLogger.info(f"measurer for image id {ds.image.id}")
ds = self.measurer.run(ds, session)
ds.update_report('measuring', session=None)
if 'measuring' in stepstodo:
SCLogger.info(f"measurer for image id {ds.image.id}")
ds = self.measurer.run(ds, session)
ds.update_report('measuring', session=None)

# measure deep learning models on the cutouts/measurements
# TODO: add this...

if self.pars.save_at_finish:
if self.pars.save_at_finish and ( 'subtraction' in stepstodo ):
t_start = time.perf_counter()
try:
SCLogger.info(f"Saving final products for image id {ds.image.id}")
Expand Down
12 changes: 12 additions & 0 deletions tests/util/test_retrydownload.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ def test_404():
with pytest.raises( RuntimeError, match='3 exceptions trying to download.*, failing.' ):
retry_download( url_nonexistent, f'{nonexistent}.dat', retries=3, sleeptime=1, exists_ok=False )

def test_mismatch_md5():
fname = "".join( random.choices( string.ascii_lowercase, k=10 ) )
fpath = pathlib.Path( fname )
assert not fpath.exists()
try:
with pytest.raises( RuntimeError, match="5 exceptions trying to download.*failing." ):
retry_download( url1, fpath, exists_ok=False, md5sum="wrong" )
finally:
fpath.unlink( missing_ok=True )
assert not fpath.exists()


def test_overwrite_misc_file():
fname = "".join( random.choices( string.ascii_lowercase, k=10 ) )
fpath = pathlib.Path( fname )
Expand Down
10 changes: 9 additions & 1 deletion util/conductor_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ def __init__( self, url=None, username=None, password=None, verify=True ):
cfg = Config.get()
self.url = url if url is not None else cfg.value( 'conductor.conductor_url' )
self.username = username if username is not None else cfg.value( 'conductor.username' )
self.password = password if password is not None else cfg.value( 'conductor.password' )
if password is None:
password = cfg.value( 'conductor.password' )
if password is None:
if cfg.value( 'conductor.password_file' ) is None:
raise ValueError( "ConductorConnector must have a password, either passed, or from "
"conductor.password or conductor.password_file configs" )
with open( cfg.value( "conductor.password_file" ) ) as ifp:
password = ifp.readline().strip()
self.password = password
self.verify = verify
self.req = None

Expand Down
5 changes: 2 additions & 3 deletions util/retrydownload.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,8 @@ def retry_download( url, fpath, md5sum=None, retries=5, sleeptime=5, exists_ok=T
md5.update( ifp.read() )
if md5.hexdigest() != md5sum:
success = False
logger.warning( f"Downloaded {fname} md5sum {md5.hexdigest()} doesn't match "
f"expected {md5sum}, retrying" )
time.sleep( sleeptime )
raise ValueError( f"Downloaded {fname} md5sum {md5.hexdigest()} doesn't match "
f"expected {md5sum}, retrying." )
except Exception as e:
strio = io.StringIO("")
traceback.print_exc( file=strio )
Expand Down

0 comments on commit 02cb2fc

Please sign in to comment.