Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Update scale down workers to handle exceptions #4417

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 97 additions & 27 deletions scripts/monitoring/auto_scale_workers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import os
import pytz
import warnings

from datetime import datetime

import pytz
from auto_stop_workers import delete_worker, start_worker, stop_worker
from dateutil.parser import parse
from auto_stop_workers import start_worker, stop_worker
from evalai_interface import EvalAI_Interface

warnings.filterwarnings("ignore")
Expand All @@ -24,27 +25,82 @@ def get_pending_submission_count(challenge_metrics):
return pending_submissions


def scale_down_workers(challenge, num_workers):
def empty_challenge_workers(challenge_id, evalai_interface):
data = {
"challenge_pk": challenge_id,
"workers": None,
"task_def_arn": None,
}
payload = json.dumps(data)
response = evalai_interface.update_challenge_attributes(payload)


def scale_down_workers(challenge, num_workers, evalai_interface):
if challenge["remote_evaluation"] or challenge["uses_ec2_worker"]:
print("Challenge either is remote evaluation or uses EC2 instance. Scaling down worker if up, regardless of submissions.")

if num_workers > 0:
response = stop_worker(challenge["id"])
print("AWS API Response: {}".format(response))
print(
"Stopped worker for Challenge ID: {}, Title: {}".format(
challenge["id"], challenge["title"]
try:
response = stop_worker(challenge["id"])
response_json = response.json()
print(f"AWS API Response: {response_json}")

action = response_json.get("action", None)
if action:
error = response_json.get("error", {})
error_message = error.get("Message", "")
error_code = error.get("Code", "")
else:
error = response_json.get("error", "")

if action == "Failure":
if error_code == "ServiceNotFoundException":
print(f"AWS API Response: {response_json}")
print(
f"Service not found for Challenge ID: {challenge['id']}."
)

elif error_message == "TaskDefinition not found.":
print(f"AWS API Response: {response_json}")
print(
f"Task Definition not found for Challenge ID: {challenge['id']}."
)
print("Emptying challenge workers at backend.")
empty_challenge_workers(challenge["id"], evalai_interface)

elif (
"error" in response_json
and "Action stop worker is not supported for an inactive challenge."
in response_json["error"]
):
print(
f"Challenge has ended for Challenge ID: {challenge['id']}. Deleting workers."
)
delete_response = delete_worker(challenge["id"])
print(f"Delete response: {delete_response.json()}")
empty_challenge_workers(challenge["id"], evalai_interface)

else:
print(
f"Stopped worker for Challenge ID: {challenge['id']}, Title: {challenge['title']}"
)

except Exception as e:
print(
f"Error handling response for Challenge ID: {challenge['id']}, Title: {challenge['title']}."
)
)
print(f"Error Details: {str(e)}")

else:
print(
"No workers and pending messages found for Challenge ID: {}, Title: {}. Skipping.".format(
challenge["id"], challenge["title"]
)
f"No workers found for Challenge ID: {challenge['id']}, Title: {challenge['title']}. Skipping."
)


def scale_up_workers(challenge, num_workers):
def scale_up_workers(challenge, num_workers, evalai_interface):
if num_workers == 0:
response = start_worker(challenge["id"])
print("AWS API Response: {}".format(response))
print("AWS API Response: {}".format(response.json()))
print(
"Started worker for Challenge ID: {}, Title: {}.".format(
challenge["id"], challenge["title"]
Expand All @@ -58,7 +114,7 @@ def scale_up_workers(challenge, num_workers):
)


def scale_up_or_down_workers(challenge, challenge_metrics):
def scale_up_or_down_workers(challenge, challenge_metrics, evalai_interface):
try:
pending_submissions = get_pending_submission_count(challenge_metrics)
except Exception: # noqa: F841
Expand All @@ -74,39 +130,53 @@ def scale_up_or_down_workers(challenge, challenge_metrics):
)

print(
"Num Workers: {}, Pending Submissions: {}".format(num_workers, pending_submissions)
"Num Workers: {}, Pending Submissions: {}".format(
num_workers, pending_submissions
)
)

if (
pending_submissions == 0
or parse(challenge["end_date"])
< pytz.UTC.localize(datetime.utcnow())
or parse(challenge["end_date"]) < pytz.UTC.localize(datetime.utcnow())
or challenge["remote_evaluation"]
or challenge["uses_ec2_worker"]
):
scale_down_workers(challenge, num_workers)
scale_down_workers(challenge, num_workers, evalai_interface)
else:
scale_up_workers(challenge, num_workers)
scale_up_workers(challenge, num_workers, evalai_interface)


# TODO: Factor in limits for the APIs
def scale_up_or_down_workers_for_challenge(challenge, challenge_metrics):
def scale_up_or_down_workers_for_challenge(
challenge, challenge_metrics, evalai_interface
):
if ENV == "prod":
try:
if challenge["remote_evaluation"] is False:
scale_up_or_down_workers(challenge, challenge_metrics)
scale_up_or_down_workers(
challenge, challenge_metrics, evalai_interface
)
except Exception as e:
print(e)
else:
try:
scale_up_or_down_workers(challenge, challenge_metrics)
scale_up_or_down_workers(
challenge, challenge_metrics, evalai_interface
)
except Exception as e:
print(e)


def scale_up_or_down_workers_for_challenges(response, evalai_interface):
for challenge in response["results"]:
try:
challenge_metrics = evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"])
scale_up_or_down_workers_for_challenge(challenge, challenge_metrics)
challenge_metrics = (
evalai_interface.get_challenge_submission_metrics_by_pk(
challenge["id"]
)
)
scale_up_or_down_workers_for_challenge(
challenge, challenge_metrics, evalai_interface
)
except Exception as e:
print(e)

Expand Down