Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BITMAKER-1932: API control max number of simultaneous jobs #63

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions database_adapters/db_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,23 @@ def get_collection_size(self, database_name, collection):
]
return collection_size

def get_database_size(self, database_name, data_type):
database = self.client[database_name]
collections = database.list_collection_names()
total_size_bytes = 0
for collection in collections:
if data_type in collection:
total_size_bytes += self.get_collection_size(database_name, collection)

return total_size_bytes

def get_collection_size(self, database_name, collection):
database = self.client[database_name]
collection_size = database.command("dataSize", f"{database_name}.{collection}")[
"size"
]
return collection_size


def get_database_interface(engine, connection, production, certificate_path):
database_interfaces = {
Expand Down
4 changes: 4 additions & 0 deletions estela-api/config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@
"task": "core.tasks.delete_expired_jobs_data",
"schedule": 3600,
},
"record-projects-usage": {
"task": "core.tasks.record_projects_usage",
"schedule": 3600,
},
}
2 changes: 1 addition & 1 deletion estela-api/config/settings/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

DEBUG = True

RUN_JOBS_PER_LOT = 10
CHECK_JOB_ERRORS_BATCH_SIZE = 10
MAX_CONCURRENT_JOBS = 10

SPIDERDATA_DB_PRODUCTION = False
2 changes: 1 addition & 1 deletion estela-api/config/settings/prod.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

DEBUG = False

RUN_JOBS_PER_LOT = 1000
CHECK_JOB_ERRORS_BATCH_SIZE = 1000
MAX_CONCURRENT_JOBS = 40

MULTI_NODE_MODE = True

Expand Down
10 changes: 8 additions & 2 deletions estela-api/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ def get_default_token(job):

@celery_app.task
def run_spider_jobs():
running_jobs_count = SpiderJob.objects.filter(
status=SpiderJob.RUNNING_STATUS
).count()
number_of_jobs_to_run = (
settings.MAX_CONCURRENT_JOBS * job_manager.get_scale_size()
) - running_jobs_count

jobs = SpiderJob.objects.filter(status=SpiderJob.IN_QUEUE_STATUS)[
: settings.RUN_JOBS_PER_LOT
:number_of_jobs_to_run
]

for job in jobs:
job_args = {arg.name: arg.value for arg in job.args.all()}
job_env_vars = {env_var.name: env_var.value for env_var in job.env_vars.all()}
Expand Down
8 changes: 0 additions & 8 deletions estela-api/docs/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1620,47 +1620,39 @@ definitions:
properties:
created_at:
title: Created at
description: Usage record creation date.
type: string
format: date-time
readOnly: true
processing_time:
title: Processing time
description: Time of CPU use.
type: number
network_usage:
title: Network usage
description: Amount of network bytes used.
type: integer
maximum: 18446744073709551615
minimum: 0
item_count:
title: Item count
description: Amount of items extracted.
type: integer
maximum: 18446744073709551615
minimum: 0
request_count:
title: Request count
description: Amount of requests made.
type: integer
maximum: 18446744073709551615
minimum: 0
items_data_size:
title: Items data size
description: Amount in bytes occupied by items in the database
type: integer
maximum: 18446744073709551615
minimum: 0
requests_data_size:
title: Requests data size
description: Amount in bytes occupied by requests in the database
type: integer
maximum: 18446744073709551615
minimum: 0
logs_data_size:
title: Logs data size
description: Amount in bytes occupied by logs in the database
type: integer
maximum: 18446744073709551615
minimum: 0
3 changes: 3 additions & 0 deletions estela-api/engines/example_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ def read_job(self, job_name):

def read_job_status(self, job_name):
return self.Status()

def get_scale_size(self):
pass
12 changes: 12 additions & 0 deletions estela-api/engines/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,15 @@ def read_job_status(self, name, namespace="default", api_instance=None):
return None

return self.Status(api_response.status)

def get_scale_size(self):
if api_instance is None:
api_instance = self.get_api_instance()

try:
api_response = api_instance.list_node()
except ApiException:
return None

number_of_nodes = len(api_response.items)
return number_of_nodes
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,31 @@ export interface SpiderJobCreate {
*/
readonly name?: string;
/**
* Job arguments.
*
* @type {number}
* @memberof SpiderJob
*/
lifespan?: number;
/**
*
* @type {number}
* @memberof SpiderJob
*/
totalResponseBytes?: number;
/**
*
* @type {number}
* @memberof SpiderJob
*/
itemCount?: number;
/**
*
* @type {number}
* @memberof SpiderJob
*/
requestCount?: number;
/**
*
* @type {Array<SpiderJobArg>}
* @memberof SpiderJobCreate
*/
Expand Down Expand Up @@ -102,6 +126,10 @@ export function SpiderJobCreateFromJSONTyped(json: any, ignoreDiscriminator: boo

'jid': !exists(json, 'jid') ? undefined : json['jid'],
'name': !exists(json, 'name') ? undefined : json['name'],
'lifespan': !exists(json, 'lifespan') ? undefined : json['lifespan'],
'totalResponseBytes': !exists(json, 'total_response_bytes') ? undefined : json['total_response_bytes'],
'itemCount': !exists(json, 'item_count') ? undefined : json['item_count'],
'requestCount': !exists(json, 'request_count') ? undefined : json['request_count'],
'args': !exists(json, 'args') ? undefined : ((json['args'] as Array<any>).map(SpiderJobArgFromJSON)),
'envVars': !exists(json, 'env_vars') ? undefined : ((json['env_vars'] as Array<any>).map(SpiderJobEnvVarFromJSON)),
'tags': !exists(json, 'tags') ? undefined : ((json['tags'] as Array<any>).map(SpiderJobTagFromJSON)),
Expand All @@ -121,6 +149,11 @@ export function SpiderJobCreateToJSON(value?: SpiderJobCreate | null): any {
}
return {

'spider': value.spider,
'lifespan': value.lifespan,
'total_response_bytes': value.totalResponseBytes,
'item_count': value.itemCount,
'request_count': value.requestCount,
'args': value.args === undefined ? undefined : ((value.args as Array<any>).map(SpiderJobArgToJSON)),
'env_vars': value.envVars === undefined ? undefined : ((value.envVars as Array<any>).map(SpiderJobEnvVarToJSON)),
'tags': value.tags === undefined ? undefined : ((value.tags as Array<any>).map(SpiderJobTagToJSON)),
Expand Down