Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Feb 19, 2025
1 parent 5a1fcb2 commit 35384bc
Showing 1 changed file with 155 additions and 28 deletions.
183 changes: 155 additions & 28 deletions cylc/uiserver/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@
_mut_field,
get_nodes_all
)
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUCCEEDED,
)
from cylc.flow.util import sstrip

from cylc.uiserver.resolvers import (
Resolvers,
list_log_files,
Expand Down Expand Up @@ -302,9 +310,18 @@ async def list_elements(args):
)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
conn = dao.connect()
if 'tasks' in args:
# if 'tasks' in args:
if True: # TODO
elements.extend(
run_jobs_query(conn, workflow, args.get('tasks'), args.get('run_status'))
run_jobs_query(
conn,
workflow,
ids=args.get('ids'),
exids=args.get('exids'),
states=args.get('states'),
exstates=args.get('exstates'),
tasks=args.get('tasks'),
)
)
else:
elements.extend(run_task_query(conn, workflow))
Expand Down Expand Up @@ -437,22 +454,129 @@ def run_task_query(conn, workflow):
return tasks


def run_jobs_query(conn, workflow, tasks, run_status=None):

# TODO: support all arguments including states
# https://github.com/cylc/cylc-uiserver/issues/440
_JOB_STATE_TO_STATUS = {
# task_state: (submit_status, run_status, time_run)
TASK_STATUS_SUBMITTED: (0, None, None),
TASK_STATUS_SUBMIT_FAILED: (1, None, None),
TASK_STATUS_RUNNING: (0, None, True),
TASK_STATUS_SUCCEEDED: (0, 0, None),
TASK_STATUS_FAILED: (0, 1, None),
}


def run_jobs_query(
conn,
workflow,
ids=None,
exids=None,
states=None,
exstates=None,
tasks=None,
):
"""Query jobs from the database."""
# TODO: support all arguments:
# * [x] ids
# * [ ] sort
# * [x] exids
# * [x] states
# * [x] exstates
# See https://github.com/cylc/cylc-uiserver/issues/440
jobs = []
where = []

# Create sql snippet used to limit which tasks are returned by query
where_stmts = []
where_args = []

# filter by cycle/task/job ID
if ids:
items = []
for id_ in ids:
item = []
for token, column in (
('cycle', 'cycle'),
('task', 'name'),
('job', 'submit_num'),
):
value = id_[token]
if value:
if token == 'job':
value = int(value)
item.append(rf'{column} GLOB ?')
where_args.append(value)
items.append(r'(' + ' AND '.join(item) + r')')

if items:
where_stmts.append(
r'(' + ' OR '.join(items) + ')'
)

# filter out cycle/task/job IDs
if exids:
for id_ in exids:
items = []
for token, column in (
('cycle', 'cycle'),
('task', 'name'),
('job', 'submit_num'),
):
value = id_[token]
if value:
if token == 'job':
value = int(value)
items.append(rf'{column} GLOB ?')
where_args.append(value)
if items:
where_stmts.append(r'NOT (' + ' AND '.join(items) + r')')

# filter by job state
if states:
items = []
for state in states:
submit_status, run_status, time_run = _JOB_STATE_TO_STATUS.get(
state, (None, None, None)
)
if submit_status is None and run_status is None:
continue
item = []
if submit_status is not None:
item.append(r'submit_status = ?')
where_args.append(submit_status)
if run_status is not None:
item.append(r'run_status = ?')
where_args.append(run_status)
if time_run is not None:
item.append(r'time_run NOT NULL')
items.append(r'(' + ' AND '.join(item) + r')')
if items:
where_stmts.append(r'(' + r' OR '.join(items) + r')')

# filter out job states
if exstates:
for state in exstates:
values = _JOB_STATE_TO_STATUS.get(
state, (None, None, None)
)
if all(value is None for value in values):
continue
submit_status, run_status, time_run = values
item = []
if submit_status is not None:
item.append(r'submit_status = ?')
where_args.append(submit_status)
if run_status is not None:
item.append(r'run_status = ?')
where_args.append(run_status)
if time_run is not None:
item.append(r'time_run = NULL')
if item:
where_stmts.append(r'NOT (' + ' and '.join(item) + r')')

# filter by task name (special UIS argument for namespace queries)
if tasks:
where.append(
where_stmts.append(
r'(name = '
+ r"' OR name = '".join(tasks)
+ r"' OR name = '".join('?' for task in tasks)
+ r')'
)
if run_status:
where.append(rf'run_status = "{run_status}"')
where_args.extend(tasks)

query = r'''
SELECT
Expand All @@ -465,23 +589,21 @@ def run_jobs_query(conn, workflow, tasks, run_status=None):
job_id,
platform_name,
time_submit,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time,
STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit)
AS total_time,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run)
AS run_time,
STRFTIME('%s', time_run) - STRFTIME('%s', time_submit)
AS queue_time,
run_status
FROM
task_jobs
'''
if where:
query += 'WHERE ' + '\n AND '.join(where)
from cylc.flow.task_state import (
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED,
)
for row in conn.execute(query):
submit_status, run_status = row[3], row[12]
if where_stmts:
query += 'WHERE\n ' + '\n AND '.join(where_stmts)

for row in conn.execute(query, where_args):
submit_status, run_status, time_run = row[3], row[12], row[4]
if run_status is not None:
if run_status == 0:
status = TASK_STATUS_SUCCEEDED
Expand Down Expand Up @@ -610,8 +732,13 @@ class LogFiles(graphene.ObjectType):
mindepth=graphene.Int(default_value=-1),
maxdepth=graphene.Int(default_value=-1),
sort=SortArgs(default_value=None),
tasks=graphene.List(graphene.ID, default_value=[]),
run_status=graphene.Int(default_value=None),
tasks=graphene.List(
graphene.ID,
default_value=[],
description='Deprecated, use ids: ["*/<task>"].',
),
states=graphene.List(graphene.ID, default_value=[]),
exstates=graphene.List(graphene.ID, default_value=[]),
)


Expand Down

0 comments on commit 35384bc

Please sign in to comment.