From 500220c4988d65fcb6993423af702091d0b7a16a Mon Sep 17 00:00:00 2001 From: Theodlz Date: Thu, 13 Jul 2023 17:39:01 -0700 Subject: [PATCH] try to add a retry decorator/wrapper for mongo calls --- kowalski/alert_brokers/alert_broker.py | 11 +++---- kowalski/alert_brokers/alert_broker_ztf.py | 34 +++++++++++++--------- kowalski/utils.py | 19 ++++++++++++ 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index ca2d1e26..3beb34ed 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -42,6 +42,7 @@ radec2lb, time_stamp, timer, + retry, ) # Tensorflow is problematic for Mac's currently, so we can add an option to disable it @@ -233,7 +234,7 @@ def poll(self): for record in msg_decoded: if ( - self.mongo.db[self.collection_alerts].count_documents( + retry(self.mongo.db[self.collection_alerts].count_documents)( {"candid": record["candid"]}, limit=1 ) == 0 @@ -920,7 +921,7 @@ def alert_filter__xmatch_no_distance( ] } } - s = self.mongo.db[catalog].find( + s = retry(self.mongo.db[catalog].find)( {**object_position_query, **catalog_filter}, {**catalog_projection} ) matches = list(s) @@ -980,7 +981,7 @@ def alert_filter__xmatch_distance( } } galaxies = list( - self.mongo.db[catalog].find( + retry(self.mongo.db[catalog].find)( {**object_position_query, **catalog_filter}, {**catalog_projection} ) ) @@ -1122,7 +1123,7 @@ def alert_filter__user_defined( _filter["pipeline"][0]["$match"]["candid"] = alert["candid"] filtered_data = list( - self.mongo.db[self.collection_alerts].aggregate( + retry(self.mongo.db[self.collection_alerts].aggregate)( _filter["pipeline"], allowDiskUse=False, maxTimeMS=max_time_ms ) ) @@ -1434,7 +1435,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters): # post full light curve try: alert["prv_candidates"] = list( - self.mongo.db[self.collection_alerts_aux].find( + retry(self.mongo.db[self.collection_alerts_aux].find)( {"_id": alert["objectId"]}, {"prv_candidates": 1}, limit=1 ) )[0]["prv_candidates"] diff --git a/kowalski/alert_brokers/alert_broker_ztf.py b/kowalski/alert_brokers/alert_broker_ztf.py index d3a7baee..a2802992 100644 --- a/kowalski/alert_brokers/alert_broker_ztf.py +++ b/kowalski/alert_brokers/alert_broker_ztf.py @@ -14,7 +14,7 @@ import dask.distributed from kowalski.alert_brokers.alert_broker import AlertConsumer, AlertWorker, EopError from bson.json_util import loads as bson_loads -from kowalski.utils import init_db_sync, timer +from kowalski.utils import init_db_sync, timer, retry from kowalski.config import load_config from kowalski.log import log @@ -49,9 +49,9 @@ def process_alert(alert: Mapping, topic: str): # return if this alert packet has already been processed and ingested into collection_alerts: if ( - alert_worker.mongo.db[alert_worker.collection_alerts].count_documents( - {"candid": candid}, limit=1 - ) + retry( + alert_worker.mongo.db[alert_worker.collection_alerts].count_documents + )({"candid": candid}, limit=1) == 1 ): return @@ -67,9 +67,9 @@ def process_alert(alert: Mapping, topic: str): alert_worker.verbose > 1, ): # get all prv_candidates for this objectId: - existing_aux = alert_worker.mongo.db[ - alert_worker.collection_alerts_aux - ].find_one({"_id": object_id}, {"prv_candidates": 1}) + existing_aux = retry( + alert_worker.mongo.db[alert_worker.collection_alerts_aux].find_one + )({"_id": object_id}, {"prv_candidates": 1}) if ( existing_aux is not None and len(existing_aux.get("prv_candidates", [])) > 0 @@ -82,7 +82,7 @@ def process_alert(alert: Mapping, topic: str): alert["classifications"] = scores with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): - alert_worker.mongo.insert_one( + retry(alert_worker.mongo.insert_one)( collection=alert_worker.collection_alerts, document=alert ) @@ -94,9 +94,11 @@ def process_alert(alert: Mapping, topic: str): # cross-match with external catalogs if objectId not in collection_alerts_aux: if ( - alert_worker.mongo.db[alert_worker.collection_alerts_aux].count_documents( - {"_id": object_id}, limit=1 - ) + retry( + alert_worker.mongo.db[ + alert_worker.collection_alerts_aux + ].count_documents + )({"_id": object_id}, limit=1) == 0 ): with timer( @@ -111,7 +113,7 @@ def process_alert(alert: Mapping, topic: str): } with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1): - alert_worker.mongo.insert_one( + retry(alert_worker.mongo.insert_one)( collection=alert_worker.collection_alerts_aux, document=alert_aux ) @@ -119,7 +121,9 @@ def process_alert(alert: Mapping, topic: str): with timer( f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1 ): - alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one( + retry( + alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one + )( {"_id": object_id}, {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, upsert=True, @@ -202,7 +206,9 @@ def get_active_filters(self): # todo: query SP to make sure the filters still exist there and we're not out of sync; # clean up if necessary return list( - self.mongo.db[config["database"]["collections"]["filters"]].aggregate( + retry( + self.mongo.db[config["database"]["collections"]["filters"]].aggregate + )( [ { "$match": { diff --git a/kowalski/utils.py b/kowalski/utils.py index f124ca22..ae94710b 100644 --- a/kowalski/utils.py +++ b/kowalski/utils.py @@ -32,6 +32,7 @@ "TimeoutHTTPAdapter", "uid", "ZTFAlert", + "retry", ] import base64 @@ -70,6 +71,24 @@ DEFAULT_TIMEOUT = 5 # seconds +# create a decorator that retries a function call until there is no exception +# up to max_retries times with a timeout of timeout seconds + + +def retry(func, max_retries=20, timeout=5): + def wrapper_retry(*args, **kwargs): + n_retries = 0 + while n_retries < max_retries: + try: + return func(*args, **kwargs) + except Exception as e: + if n_retries == max_retries - 1: + raise e + time.sleep(timeout) + n_retries += 1 + + return wrapper_retry + @contextmanager def status(message):