Skip to content

Commit

Permalink
make sure that start time is written to DB for start tim in sim mode
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Sep 25, 2024
1 parent 5585e3c commit 6acb70c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 27 deletions.
1 change: 1 addition & 0 deletions cylc/flow/run_modes/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def submit_task_job(
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
itask, {
'time_submit': now[1],
'time_run': now[1],
'try_num': itask.get_try_num(),
'flow_nums': str(list(itask.flow_nums)),
'is_manual_submit': itask.is_manual_submit,
Expand Down
68 changes: 41 additions & 27 deletions tests/integration/run_modes/test_nonlive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import pytest
from typing import Any, Dict

# Define here to ensure test doesn't just mirror code:
Expand Down Expand Up @@ -47,7 +48,41 @@ def not_time(data: Dict[str, Any]):
return {k: v for k, v in data.items() if 'time' not in k}


async def test_db_task_jobs(flow, scheduler, start, capture_live_submissions):
@pytest.fixture
def submit_and_check_db():
"""Wraps up testing that we want to do repeatedly in
test_db_task_jobs.
"""
def _inner(schd):
# Submit task jobs:
schd.task_job_mgr.submit_task_jobs(
schd.workflow,
schd.pool.get_tasks(),
schd.server.curve_auth,
schd.server.client_pub_key_dir
)
# Make sure that db changes are enacted:
schd.workflow_db_mgr.process_queued_ops()

for mode, kgo in KGO.items():
task_jobs = schd.workflow_db_mgr.pub_dao.select_task_job(1, mode)

# Check all non-datetime items against KGO:
assert not_time(task_jobs) == kgo, (
f'Mode {mode}: incorrect db entries.')

# Check that timestamps have been created:
for timestamp in [
'time_submit', 'time_submit_exit', 'time_run', 'time_run_exit'
]:
assert task_jobs[timestamp] is not None
return _inner


async def test_db_task_jobs(
flow, scheduler, start, capture_live_submissions,
submit_and_check_db
):
"""Ensure that task job data is added to the database correctly
for each run mode.
"""
Expand All @@ -58,37 +93,16 @@ async def test_db_task_jobs(flow, scheduler, start, capture_live_submissions):
mode: {'run mode': mode} for mode in KGO}
}))
async with start(schd):
# Reference all task proxies so we can examine them
# at the end of the test:
task_proxies = schd.pool.get_tasks()

schd.task_job_mgr.submit_task_jobs(
schd.workflow,
schd.pool.get_tasks(),
schd.server.curve_auth,
schd.server.client_pub_key_dir
)
schd.workflow_db_mgr.process_queued_ops()

for mode, kgo in KGO.items():
taskdata = not_time(
schd.workflow_db_mgr.pub_dao.select_task_job(1, mode))
assert taskdata == kgo, (
f'Mode {mode}: incorrect db entries.')
submit_and_check_db(schd)

# Set outputs to failed:
schd.pool.set_prereqs_and_outputs('*', ['failed'], [], [])

schd.task_job_mgr.submit_task_jobs(
schd.workflow,
schd.pool.get_tasks(),
schd.server.curve_auth,
schd.server.client_pub_key_dir
)
schd.workflow_db_mgr.process_queued_ops()

for mode, kgo in KGO.items():
taskdata = not_time(
schd.workflow_db_mgr.pub_dao.select_task_job(1, mode))
assert taskdata == kgo, (
f'Mode {mode}: incorrect db entries.')
submit_and_check_db(schd)

assert task_proxies[0].run_mode == 'simulation'
assert task_proxies[1].run_mode == 'skip'
Expand Down

0 comments on commit 6acb70c

Please sign in to comment.