From 21b01a51d55c287fda1bc2f6ddded54bf0f3b77e Mon Sep 17 00:00:00 2001 From: Mateo Gonzales Navarrete Date: Wed, 17 Aug 2022 02:52:45 -0500 Subject: [PATCH] API only allow MAX_CONCURRENT_JOB*NODE_COUNT concurrent jobs --- estela-api/config/settings/local.py | 2 +- estela-api/config/settings/prod.py | 2 +- estela-api/config/settings/test.py | 2 +- estela-api/core/tasks.py | 10 ++++++++-- estela-api/engines/example_engine.py | 3 +++ estela-api/engines/kubernetes.py | 12 ++++++++++++ 6 files changed, 26 insertions(+), 5 deletions(-) diff --git a/estela-api/config/settings/local.py b/estela-api/config/settings/local.py index bd1e9b45..62722e70 100644 --- a/estela-api/config/settings/local.py +++ b/estela-api/config/settings/local.py @@ -2,7 +2,7 @@ DEBUG = True -RUN_JOBS_PER_LOT = 10 CHECK_JOB_ERRORS_BATCH_SIZE = 10 +MAX_CONCURRENT_JOBS = 10 SPIDERDATA_DB_PRODUCTION = False diff --git a/estela-api/config/settings/prod.py b/estela-api/config/settings/prod.py index 578e7970..32452ab4 100644 --- a/estela-api/config/settings/prod.py +++ b/estela-api/config/settings/prod.py @@ -2,8 +2,8 @@ DEBUG = False -RUN_JOBS_PER_LOT = 1000 CHECK_JOB_ERRORS_BATCH_SIZE = 1000 +MAX_CONCURRENT_JOBS = 40 MULTI_NODE_MODE = True diff --git a/estela-api/config/settings/test.py b/estela-api/config/settings/test.py index 4445eb52..5a5d26f6 100644 --- a/estela-api/config/settings/test.py +++ b/estela-api/config/settings/test.py @@ -2,7 +2,7 @@ DEBUG = True -RUN_JOBS_PER_LOT = 10 CHECK_JOB_ERRORS_BATCH_SIZE = 10 +MAX_CONCURRENT_JOBS = 10 MONGO_PRODUCTION = False diff --git a/estela-api/core/tasks.py b/estela-api/core/tasks.py index b6bf348e..d8bc6691 100644 --- a/estela-api/core/tasks.py +++ b/estela-api/core/tasks.py @@ -22,10 +22,16 @@ def get_default_token(job): @celery_app.task def run_spider_jobs(): + running_jobs_count = SpiderJob.objects.filter( + status=SpiderJob.RUNNING_STATUS + ).count() + number_of_jobs_to_run = ( + settings.MAX_CONCURRENT_JOBS * job_manager.get_scale_size() + ) - running_jobs_count + jobs = SpiderJob.objects.filter(status=SpiderJob.IN_QUEUE_STATUS)[ - : settings.RUN_JOBS_PER_LOT + :number_of_jobs_to_run ] - for job in jobs: job_args = {arg.name: arg.value for arg in job.args.all()} job_env_vars = {env_var.name: env_var.value for env_var in job.env_vars.all()} diff --git a/estela-api/engines/example_engine.py b/estela-api/engines/example_engine.py index fcf940cc..d2bec54a 100644 --- a/estela-api/engines/example_engine.py +++ b/estela-api/engines/example_engine.py @@ -24,3 +24,6 @@ def read_job(self, job_name): def read_job_status(self, job_name): return self.Status() + + def get_scale_size(self): + pass diff --git a/estela-api/engines/kubernetes.py b/estela-api/engines/kubernetes.py index a99f8098..e32bd817 100644 --- a/estela-api/engines/kubernetes.py +++ b/estela-api/engines/kubernetes.py @@ -188,3 +188,15 @@ def read_job_status(self, name, namespace="default", api_instance=None): return None return self.Status(api_response.status) + + def get_scale_size(self): + if api_instance is None: + api_instance = self.get_api_instance() + + try: + api_response = api_instance.list_node() + except ApiException: + return None + + number_of_nodes = len(api_response.items) + return number_of_nodes