Skip to content

Commit

Permalink
exp
Browse files Browse the repository at this point in the history
  • Loading branch information
morsecodist committed Jul 5, 2024
1 parent 5c80653 commit c08cedd
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions lib/idseq_utils/idseq_utils/batch_run_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import requests
import shutil
import time
import sys
from os import listdir
from multiprocessing import Pool
from subprocess import run
Expand All @@ -20,9 +21,6 @@
from botocore.exceptions import ClientError
from botocore.config import Config

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

MAX_CHUNKS_IN_FLIGHT = 30 # TODO: remove this constant, currently does nothing since we have at most 30 index chunks

# mitigation for TooManyRequestExceptions
Expand Down Expand Up @@ -65,7 +63,7 @@ def _get_job_status(job_id, use_batch_api=False):
if use_batch_api:
jobs = _batch_client.describe_jobs(jobs=[job_id])["jobs"]
if not jobs:
log.debug(f"missing_job_description_from_api: {job_id}")
print(f"missing_job_description_from_api: {job_id}", file=sys.stderr)
return "SUBMITTED"
return jobs[0]["status"]
batch_job_desc_bucket = boto3.resource("s3").Bucket(
Expand All @@ -78,7 +76,7 @@ def _get_job_status(job_id, use_batch_api=False):
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
# Warn that the object is missing so any issue with the s3 mechanism can be identified
log.debug(f"missing_job_description_object key: {key}")
print(f"missing_job_description_object key: {key}", file=sys.stderr)
# Return submitted because a missing job status probably means it hasn't been added yet
return "SUBMITTED"
else:
Expand Down Expand Up @@ -148,9 +146,7 @@ def _run_batch_job(
cache.put(submit_args, job_id)

def _log_status(status: str):
level = logging.INFO if status != "FAILED" else logging.ERROR
log.log(
level,
print(
"batch_job_status " + json.dumps(
{
"job_id": job_id,
Expand All @@ -161,6 +157,7 @@ def _log_status(status: str):
"environment": environment,
}
),
file=sys.stderr,
)

_log_status("SUBMITTED")
Expand All @@ -177,11 +174,12 @@ def _log_status(status: str):
except ClientError as e:
# If we get throttled, randomly wait to de-synchronize the requests
if e.response["Error"]["Code"] == "TooManyRequestsException":
log.warn(f"describe_jobs_rate_limit_error for job_id: {job_id}")
print(f"describe_jobs_rate_limit_error for job_id: {job_id}", file=sys.stderr)
# Possibly implement a backoff here if throttling becomes an issue
else:
log.error(
print(
f"unexpected_client_error_while_polling_job_status for job_id: {job_id}",
file=sys.stderr,
)
raise e

Expand Down Expand Up @@ -284,7 +282,7 @@ def _job_queue(provisioning_model: str):
def _db_chunks(bucket: str, prefix):
s3_client = boto3.client("s3")
paginator = s3_client.get_paginator("list_objects_v2")
log.debug("db chunks")
print("db chunks", file=sys.stderr)

for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page["Contents"]:
Expand Down Expand Up @@ -325,7 +323,7 @@ def run_alignment(
for fn in listdir("chunks"):
if fn.endswith("json"):
os.remove(os.path.join("chunks", fn))
log.debug(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})")
print(f"deleting from S3: {os.path.join(chunk_dir, fn)} ({chunk_dir}, {fn})", file=sys.stderr)
_s3_client.put_object_tagging(
Bucket=bucket,
Key=os.path.join(chunk_dir, fn),
Expand Down

0 comments on commit c08cedd

Please sign in to comment.