Skip to content

Commit

Permalink
try to add a retry decorator/wrapper for mongo calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodlz committed Jul 14, 2023
1 parent 9dbf0ac commit 500220c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
11 changes: 6 additions & 5 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
radec2lb,
time_stamp,
timer,
retry,
)

# Tensorflow is problematic for Mac's currently, so we can add an option to disable it
Expand Down Expand Up @@ -233,7 +234,7 @@ def poll(self):

for record in msg_decoded:
if (
self.mongo.db[self.collection_alerts].count_documents(
retry(self.mongo.db[self.collection_alerts].count_documents)(
{"candid": record["candid"]}, limit=1
)
== 0
Expand Down Expand Up @@ -920,7 +921,7 @@ def alert_filter__xmatch_no_distance(
]
}
}
s = self.mongo.db[catalog].find(
s = retry(self.mongo.db[catalog].find)(
{**object_position_query, **catalog_filter}, {**catalog_projection}
)
matches = list(s)
Expand Down Expand Up @@ -980,7 +981,7 @@ def alert_filter__xmatch_distance(
}
}
galaxies = list(
self.mongo.db[catalog].find(
retry(self.mongo.db[catalog].find)(
{**object_position_query, **catalog_filter}, {**catalog_projection}
)
)
Expand Down Expand Up @@ -1122,7 +1123,7 @@ def alert_filter__user_defined(
_filter["pipeline"][0]["$match"]["candid"] = alert["candid"]

filtered_data = list(
self.mongo.db[self.collection_alerts].aggregate(
retry(self.mongo.db[self.collection_alerts].aggregate)(
_filter["pipeline"], allowDiskUse=False, maxTimeMS=max_time_ms
)
)
Expand Down Expand Up @@ -1434,7 +1435,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
# post full light curve
try:
alert["prv_candidates"] = list(
self.mongo.db[self.collection_alerts_aux].find(
retry(self.mongo.db[self.collection_alerts_aux].find)(
{"_id": alert["objectId"]}, {"prv_candidates": 1}, limit=1
)
)[0]["prv_candidates"]
Expand Down
34 changes: 20 additions & 14 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import dask.distributed
from kowalski.alert_brokers.alert_broker import AlertConsumer, AlertWorker, EopError
from bson.json_util import loads as bson_loads
from kowalski.utils import init_db_sync, timer
from kowalski.utils import init_db_sync, timer, retry
from kowalski.config import load_config
from kowalski.log import log

Expand Down Expand Up @@ -49,9 +49,9 @@ def process_alert(alert: Mapping, topic: str):

# return if this alert packet has already been processed and ingested into collection_alerts:
if (
alert_worker.mongo.db[alert_worker.collection_alerts].count_documents(
{"candid": candid}, limit=1
)
retry(
alert_worker.mongo.db[alert_worker.collection_alerts].count_documents
)({"candid": candid}, limit=1)
== 1
):
return
Expand All @@ -67,9 +67,9 @@ def process_alert(alert: Mapping, topic: str):
alert_worker.verbose > 1,
):
# get all prv_candidates for this objectId:
existing_aux = alert_worker.mongo.db[
alert_worker.collection_alerts_aux
].find_one({"_id": object_id}, {"prv_candidates": 1})
existing_aux = retry(
alert_worker.mongo.db[alert_worker.collection_alerts_aux].find_one
)({"_id": object_id}, {"prv_candidates": 1})
if (
existing_aux is not None
and len(existing_aux.get("prv_candidates", [])) > 0
Expand All @@ -82,7 +82,7 @@ def process_alert(alert: Mapping, topic: str):
alert["classifications"] = scores

with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1):
alert_worker.mongo.insert_one(
retry(alert_worker.mongo.insert_one)(
collection=alert_worker.collection_alerts, document=alert
)

Expand All @@ -94,9 +94,11 @@ def process_alert(alert: Mapping, topic: str):

# cross-match with external catalogs if objectId not in collection_alerts_aux:
if (
alert_worker.mongo.db[alert_worker.collection_alerts_aux].count_documents(
{"_id": object_id}, limit=1
)
retry(
alert_worker.mongo.db[
alert_worker.collection_alerts_aux
].count_documents
)({"_id": object_id}, limit=1)
== 0
):
with timer(
Expand All @@ -111,15 +113,17 @@ def process_alert(alert: Mapping, topic: str):
}

with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1):
alert_worker.mongo.insert_one(
retry(alert_worker.mongo.insert_one)(
collection=alert_worker.collection_alerts_aux, document=alert_aux
)

else:
with timer(
f"Aux updating of {object_id} {candid}", alert_worker.verbose > 1
):
alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one(
retry(
alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one
)(
{"_id": object_id},
{"$addToSet": {"prv_candidates": {"$each": prv_candidates}}},
upsert=True,
Expand Down Expand Up @@ -202,7 +206,9 @@ def get_active_filters(self):
# todo: query SP to make sure the filters still exist there and we're not out of sync;
# clean up if necessary
return list(
self.mongo.db[config["database"]["collections"]["filters"]].aggregate(
retry(
self.mongo.db[config["database"]["collections"]["filters"]].aggregate
)(
[
{
"$match": {
Expand Down
19 changes: 19 additions & 0 deletions kowalski/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"TimeoutHTTPAdapter",
"uid",
"ZTFAlert",
"retry",
]

import base64
Expand Down Expand Up @@ -70,6 +71,24 @@

DEFAULT_TIMEOUT = 5 # seconds

# create a decorator that retries a function call until there is no exception
# up to max_retries times with a timeout of timeout seconds


def retry(func, max_retries=20, timeout=5):
def wrapper_retry(*args, **kwargs):
n_retries = 0
while n_retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
if n_retries == max_retries - 1:
raise e
time.sleep(timeout)
n_retries += 1

return wrapper_retry


@contextmanager
def status(message):
Expand Down

0 comments on commit 500220c

Please sign in to comment.