From 4cf753281849b667b4ed2d2c208a1c97da1918df Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 28 Nov 2023 17:02:35 +0000 Subject: [PATCH] tests/i: fix flaky tui/test_updater tests * These tests were trying to be a bit too clever, testing the updater whilst it was running. * This wasn't really needed for what the tests were tring to assert, there are other tests which run Tui for real (live updater and all), those tests didn't exist when these ones were written. * This commit freezes the updater so we can run it in lock-step with the tests. --- cylc/flow/tui/updater.py | 49 ++++---- tests/integration/tui/test_updater.py | 171 +++++++++----------------- 2 files changed, 83 insertions(+), 137 deletions(-) diff --git a/cylc/flow/tui/updater.py b/cylc/flow/tui/updater.py index 5321a4d6c1f..f9bd0cf2c00 100644 --- a/cylc/flow/tui/updater.py +++ b/cylc/flow/tui/updater.py @@ -155,7 +155,11 @@ async def run(self, filters): """ with suppress_logging(): self._update_filters(filters) - await self._update() + while True: + ret = await self._update() + if ret == self.SIGNAL_TERMINATE: + break + self.update_queue.put(ret) def _subscribe(self, w_id): if w_id not in self._clients: @@ -183,27 +187,30 @@ def _update_filters(self, filters): self.filters = filters async def _update(self): - last_scan_time = 0 - while True: - # process any pending commands - if not self._command_queue.empty(): - (command, payload) = self._command_queue.get() - if command == self.SIGNAL_TERMINATE: - break - getattr(self, command)(payload) - continue + """Run one iteration of the updater. - # do a workflow scan if it's due - update_start_time = time() - if update_start_time - last_scan_time > self.BASE_SCAN_INTERVAL: - data = await self._scan() - - # get the next snapshot from workflows we are subscribed to - self.update_queue.put(await self._run_update(data)) - - # schedule the next update - update_time = time() - update_start_time - await sleep(self.BASE_UPDATE_INTERVAL - update_time) + Either returns the next update or "self.SIGNAL_TERMINATE". + """ + last_scan_time = 0 + # process any pending commands + while not self._command_queue.empty(): + (command, payload) = self._command_queue.get() + if command == self.SIGNAL_TERMINATE: + return command + getattr(self, command)(payload) + + # do a workflow scan if it's due + update_start_time = time() + if update_start_time - last_scan_time > self.BASE_SCAN_INTERVAL: + data = await self._scan() + + # get the next snapshot from workflows we are subscribed to + update = await self._run_update(data) + + # schedule the next update + update_time = time() - update_start_time + await sleep(self.BASE_UPDATE_INTERVAL - update_time) + return update async def _run_update(self, data): # copy the scanned data so it can be reused for future updates diff --git a/tests/integration/tui/test_updater.py b/tests/integration/tui/test_updater.py index 39a9a606eff..b3daac5a328 100644 --- a/tests/integration/tui/test_updater.py +++ b/tests/integration/tui/test_updater.py @@ -15,12 +15,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import asyncio from copy import deepcopy from pathlib import Path +from queue import Queue import re from async_timeout import timeout +import pytest from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.id import Tokens @@ -31,57 +32,35 @@ from cylc.flow.workflow_status import WorkflowStatus -async def await_update(updater): - """Wait for the latest update from the Updater. - - Note: - If multiple updates are waiting, this returns the most recent.Q - - Returns: - The latest update from the Updater's "update_queue". - - """ - while updater.update_queue.empty(): - await asyncio.sleep(0.1) - while not updater.update_queue.empty(): - # flush out any older updates to avoid race conditions in tests - update = updater.update_queue.get() - return update - - -async def wait_workflow_connected(updater, tokens, connected=True, time=3): - """Wait for the Updater to connect to a workflow. - - This will return once the updater has connected to a workflow and returned - the first data from it. +@pytest.fixture +def updater(monkeypatch, test_dir): + """Return an updater ready for testing.""" + # patch the update intervals so that everything runs for every update + monkeypatch.setattr( + 'cylc.flow.tui.updater.Updater.BASE_UPDATE_INTERVAL', + 0, + ) + monkeypatch.setattr( + 'cylc.flow.tui.updater.Updater.BASE_SCAN_INTERVAL', + 0, + ) + + # create the updater + updater = Updater() - Arugments: - tokens: - The tokens of the workflow you're waiting for. - connected: - If True this waits for the updater to connect to the workflow, if - False it waits for the updater to disconnect from it. - time: - The maximum time to wait for this to happen. + # swap multiprocessing.Queue for queue.Queue + # (this means queued operations are instant making tests more stable) + updater.update_queue = Queue() + updater._command_queue = Queue() - Returns: - The first update from the Updater which contains the workflow data. + # set up the filters + # (these filter for the workflows created in this test only) + filters = get_default_filters() + id_base = str(test_dir.relative_to(Path("~/cylc-run").expanduser())) + filters['workflows']['id'] = f'^{re.escape(id_base)}/.*' + updater._update_filters(filters) - """ - async with timeout(time): - while True: - root_node = await await_update(updater) - workflow_node = root_node['children'][0] - for workflow_node in root_node['children']: - if ( - workflow_node['id_'] == tokens.id - and ( - workflow_node['children'][0]['id_'] != '#spring' - ) == connected - ): - # if the spring node is still there then we haven't - # recieved the first update from the workflow yet - return root_node + return updater def get_child_tokens(root_node, types, relative=False): @@ -111,19 +90,16 @@ def get_child_tokens(root_node, types, relative=False): return ret -async def test_subscribe(one_conf, flow, scheduler, run, test_dir): +async def test_subscribe(one_conf, flow, scheduler, run, updater): """It should subscribe and unsubscribe from workflows.""" id_ = flow(one_conf) schd = scheduler(id_) - updater = Updater() - - async def the_test(): - nonlocal updater - - try: + async with run(schd): + # run the updater and the test + async with timeout(10): # wait for the first update - root_node = await await_update(updater) + root_node = await updater._update() # there should be a root root_node assert root_node['id_'] == 'root' @@ -135,9 +111,7 @@ async def the_test(): # subscribe to the workflow updater.subscribe(schd.tokens.id) - - # wait for it to connect to the workflow - root_node = await wait_workflow_connected(updater, schd.tokens) + root_node = await updater._update() # check the workflow contains one cycle with one task in it workflow_node = root_node['children'][0] @@ -150,31 +124,13 @@ async def the_test(): # unsubscribe from the workflow updater.unsubscribe(schd.tokens.id) + root_node = await updater._update() - # wait for it to disconnect from the workflow - root_node = await wait_workflow_connected( - updater, - schd.tokens, - connected=False, - ) - - finally: - # shut down the updater - updater.terminate() - - async with run(schd): - filters = get_default_filters() - filters['workflows']['id'] = f'{re.escape(str(test_dir.relative_to(Path("~/cylc-run").expanduser())))}/.*' - - # run the updater and the test - async with timeout(10): - await asyncio.gather( - asyncio.create_task(updater.run(filters)), - asyncio.create_task(the_test()), - ) + # the workflow should be replaced by a "spring" node again + assert root_node['children'][0]['children'][0]['id_'] == '#spring' -async def test_filters(one_conf, flow, scheduler, run, test_dir): +async def test_filters(one_conf, flow, scheduler, run, updater): """It should filter workflow and task states. Note: @@ -196,16 +152,20 @@ async def test_filters(one_conf, flow, scheduler, run, test_dir): two = scheduler(flow(one_conf, name='two')) tre = scheduler(flow(one_conf, name='tre')) - filters = get_default_filters() - id_base = str(test_dir.relative_to(Path("~/cylc-run").expanduser())) - filters['workflows']['id'] = f'^{re.escape(id_base)}/.*' + # start workflow "one" + async with run(one): + # mark "1/a" as running and "1/b" as succeeded + one_a = one.pool.get_task(IntegerPoint('1'), 'a') + one_a.state_reset('running') + one.data_store_mgr.delta_task_state(one_a) + one.pool.get_task(IntegerPoint('1'), 'b').state_reset('succeeded') - updater = Updater() + # start workflow "two" + async with run(two): + # run the updater and the test + filters = deepcopy(updater.filters) - async def the_test(): - nonlocal filters - try: - root_node = await await_update(updater) + root_node = await updater._update() assert {child['id_'] for child in root_node['children']} == { one.tokens.id, two.tokens.id, @@ -219,7 +179,7 @@ async def the_test(): updater.update_filters(filters) # "one" and "two" should now be filtered out - root_node = await await_update(updater) + root_node = await updater._update() assert {child['id_'] for child in root_node['children']} == { tre.tokens.id, } @@ -231,15 +191,15 @@ async def the_test(): updater.update_filters(filters) # "tre" should now be filtered out - root_node = await await_update(updater) + root_node = await updater._update() assert {child['id_'] for child in root_node['children']} == { one.tokens.id, two.tokens.id, } # subscribe to "one" - updater.subscribe(one.tokens.id) - root_node = await wait_workflow_connected(updater, one.tokens) + updater._subscribe(one.tokens.id) + root_node = await updater._update() assert get_child_tokens( root_node, types={'task'}, relative=True ) == { @@ -254,7 +214,7 @@ async def the_test(): # filters['tasks'][TASK_STATUS_RUNNING] = False # updater.update_filters(filters) - # root_node = await await_update(updater) + # root_node = await updater._update() # assert get_child_tokens( # root_node, # types={'task'}, @@ -263,24 +223,3 @@ async def the_test(): # '1/b', # '1/c', # } - - finally: - # shut down the updater - updater.terminate() - - # start workflow "one" - async with run(one): - # mark "1/a" as running and "1/b" as succeeded - one_a = one.pool.get_task(IntegerPoint('1'), 'a') - one_a.state_reset('running') - one.data_store_mgr.delta_task_state(one_a) - one.pool.get_task(IntegerPoint('1'), 'b').state_reset('succeeded') - - # start workflow "two" - async with run(two): - # run the updater and the test - async with timeout(10): - await asyncio.gather( - asyncio.create_task(the_test()), - asyncio.create_task(updater.run(filters)), - )