Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/i: fix flaky tui/test_updater tests #5849

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions cylc/flow/tui/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ async def run(self, filters):
"""
with suppress_logging():
self._update_filters(filters)
await self._update()
while True:
ret = await self._update()
if ret == self.SIGNAL_TERMINATE:
break
self.update_queue.put(ret)

def _subscribe(self, w_id):
if w_id not in self._clients:
Expand Down Expand Up @@ -183,27 +187,30 @@ def _update_filters(self, filters):
self.filters = filters

async def _update(self):
last_scan_time = 0
while True:
# process any pending commands
if not self._command_queue.empty():
(command, payload) = self._command_queue.get()
if command == self.SIGNAL_TERMINATE:
break
getattr(self, command)(payload)
continue
"""Run one iteration of the updater.

# do a workflow scan if it's due
update_start_time = time()
if update_start_time - last_scan_time > self.BASE_SCAN_INTERVAL:
data = await self._scan()

# get the next snapshot from workflows we are subscribed to
self.update_queue.put(await self._run_update(data))

# schedule the next update
update_time = time() - update_start_time
await sleep(self.BASE_UPDATE_INTERVAL - update_time)
Either returns the next update or "self.SIGNAL_TERMINATE".
"""
last_scan_time = 0
# process any pending commands
while not self._command_queue.empty():
(command, payload) = self._command_queue.get()
if command == self.SIGNAL_TERMINATE:
return command
getattr(self, command)(payload)

# do a workflow scan if it's due
update_start_time = time()
if update_start_time - last_scan_time > self.BASE_SCAN_INTERVAL:
data = await self._scan()

# get the next snapshot from workflows we are subscribed to
update = await self._run_update(data)

# schedule the next update
update_time = time() - update_start_time
await sleep(self.BASE_UPDATE_INTERVAL - update_time)
return update

async def _run_update(self, data):
# copy the scanned data so it can be reused for future updates
Expand Down
171 changes: 55 additions & 116 deletions tests/integration/tui/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
from copy import deepcopy
from pathlib import Path
from queue import Queue
import re

from async_timeout import timeout
import pytest

from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.id import Tokens
Expand All @@ -31,57 +32,35 @@
from cylc.flow.workflow_status import WorkflowStatus


async def await_update(updater):
"""Wait for the latest update from the Updater.

Note:
If multiple updates are waiting, this returns the most recent.Q

Returns:
The latest update from the Updater's "update_queue".

"""
while updater.update_queue.empty():
await asyncio.sleep(0.1)
while not updater.update_queue.empty():
# flush out any older updates to avoid race conditions in tests
update = updater.update_queue.get()
return update


async def wait_workflow_connected(updater, tokens, connected=True, time=3):
"""Wait for the Updater to connect to a workflow.

This will return once the updater has connected to a workflow and returned
the first data from it.
@pytest.fixture
def updater(monkeypatch, test_dir):
"""Return an updater ready for testing."""
# patch the update intervals so that everything runs for every update
monkeypatch.setattr(
'cylc.flow.tui.updater.Updater.BASE_UPDATE_INTERVAL',
0,
)
monkeypatch.setattr(
'cylc.flow.tui.updater.Updater.BASE_SCAN_INTERVAL',
0,
)

# create the updater
updater = Updater()

Arugments:
tokens:
The tokens of the workflow you're waiting for.
connected:
If True this waits for the updater to connect to the workflow, if
False it waits for the updater to disconnect from it.
time:
The maximum time to wait for this to happen.
# swap multiprocessing.Queue for queue.Queue
# (this means queued operations are instant making tests more stable)
updater.update_queue = Queue()
updater._command_queue = Queue()

Returns:
The first update from the Updater which contains the workflow data.
# set up the filters
# (these filter for the workflows created in this test only)
filters = get_default_filters()
id_base = str(test_dir.relative_to(Path("~/cylc-run").expanduser()))
filters['workflows']['id'] = f'^{re.escape(id_base)}/.*'
updater._update_filters(filters)

"""
async with timeout(time):
while True:
root_node = await await_update(updater)
workflow_node = root_node['children'][0]
for workflow_node in root_node['children']:
if (
workflow_node['id_'] == tokens.id
and (
workflow_node['children'][0]['id_'] != '#spring'
) == connected
):
# if the spring node is still there then we haven't
# recieved the first update from the workflow yet
return root_node
return updater


def get_child_tokens(root_node, types, relative=False):
Expand Down Expand Up @@ -111,19 +90,16 @@ def get_child_tokens(root_node, types, relative=False):
return ret


async def test_subscribe(one_conf, flow, scheduler, run, test_dir):
async def test_subscribe(one_conf, flow, scheduler, run, updater):
"""It should subscribe and unsubscribe from workflows."""
id_ = flow(one_conf)
schd = scheduler(id_)

updater = Updater()

async def the_test():
nonlocal updater

try:
async with run(schd):
# run the updater and the test
async with timeout(10):
# wait for the first update
root_node = await await_update(updater)
root_node = await updater._update()

# there should be a root root_node
assert root_node['id_'] == 'root'
Expand All @@ -135,9 +111,7 @@ async def the_test():

# subscribe to the workflow
updater.subscribe(schd.tokens.id)

# wait for it to connect to the workflow
root_node = await wait_workflow_connected(updater, schd.tokens)
root_node = await updater._update()

# check the workflow contains one cycle with one task in it
workflow_node = root_node['children'][0]
Expand All @@ -150,31 +124,13 @@ async def the_test():

# unsubscribe from the workflow
updater.unsubscribe(schd.tokens.id)
root_node = await updater._update()

# wait for it to disconnect from the workflow
root_node = await wait_workflow_connected(
updater,
schd.tokens,
connected=False,
)

finally:
# shut down the updater
updater.terminate()

async with run(schd):
filters = get_default_filters()
filters['workflows']['id'] = f'{re.escape(str(test_dir.relative_to(Path("~/cylc-run").expanduser())))}/.*'

# run the updater and the test
async with timeout(10):
await asyncio.gather(
asyncio.create_task(updater.run(filters)),
asyncio.create_task(the_test()),
)
# the workflow should be replaced by a "spring" node again
assert root_node['children'][0]['children'][0]['id_'] == '#spring'


async def test_filters(one_conf, flow, scheduler, run, test_dir):
async def test_filters(one_conf, flow, scheduler, run, updater):
"""It should filter workflow and task states.

Note:
Expand All @@ -196,16 +152,20 @@ async def test_filters(one_conf, flow, scheduler, run, test_dir):
two = scheduler(flow(one_conf, name='two'))
tre = scheduler(flow(one_conf, name='tre'))

filters = get_default_filters()
id_base = str(test_dir.relative_to(Path("~/cylc-run").expanduser()))
filters['workflows']['id'] = f'^{re.escape(id_base)}/.*'
# start workflow "one"
async with run(one):
# mark "1/a" as running and "1/b" as succeeded
one_a = one.pool.get_task(IntegerPoint('1'), 'a')
one_a.state_reset('running')
one.data_store_mgr.delta_task_state(one_a)
one.pool.get_task(IntegerPoint('1'), 'b').state_reset('succeeded')

updater = Updater()
# start workflow "two"
async with run(two):
# run the updater and the test
filters = deepcopy(updater.filters)

async def the_test():
nonlocal filters
try:
root_node = await await_update(updater)
root_node = await updater._update()
assert {child['id_'] for child in root_node['children']} == {
one.tokens.id,
two.tokens.id,
Expand All @@ -219,7 +179,7 @@ async def the_test():
updater.update_filters(filters)

# "one" and "two" should now be filtered out
root_node = await await_update(updater)
root_node = await updater._update()
assert {child['id_'] for child in root_node['children']} == {
tre.tokens.id,
}
Expand All @@ -231,15 +191,15 @@ async def the_test():
updater.update_filters(filters)

# "tre" should now be filtered out
root_node = await await_update(updater)
root_node = await updater._update()
assert {child['id_'] for child in root_node['children']} == {
one.tokens.id,
two.tokens.id,
}

# subscribe to "one"
updater.subscribe(one.tokens.id)
root_node = await wait_workflow_connected(updater, one.tokens)
updater._subscribe(one.tokens.id)
root_node = await updater._update()
assert get_child_tokens(
root_node, types={'task'}, relative=True
) == {
Expand All @@ -254,7 +214,7 @@ async def the_test():
# filters['tasks'][TASK_STATUS_RUNNING] = False
# updater.update_filters(filters)

# root_node = await await_update(updater)
# root_node = await updater._update()
# assert get_child_tokens(
# root_node,
# types={'task'},
Expand All @@ -263,24 +223,3 @@ async def the_test():
# '1/b',
# '1/c',
# }

finally:
# shut down the updater
updater.terminate()

# start workflow "one"
async with run(one):
# mark "1/a" as running and "1/b" as succeeded
one_a = one.pool.get_task(IntegerPoint('1'), 'a')
one_a.state_reset('running')
one.data_store_mgr.delta_task_state(one_a)
one.pool.get_task(IntegerPoint('1'), 'b').state_reset('succeeded')

# start workflow "two"
async with run(two):
# run the updater and the test
async with timeout(10):
await asyncio.gather(
asyncio.create_task(the_test()),
asyncio.create_task(updater.run(filters)),
)