From a839eedf3c1cff4014acea6334c35652a63a3603 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 11 Nov 2024 17:21:46 +0000 Subject: [PATCH 1/2] cat-log: add timeout to processes * Partially addresses #643 * Add a timeout to `cylc cat-log` processes: * Ensures we don't accumulate them by accident. * Improves efficiency by closing long-lived log streams (the user probably isn't looking at the log file any more and the log file has probably stopped growing anyway). * At present, `cylc cat-log` processes may be left behind if the websocket is closed abruptly. This has been observed as the result of a proxy timeout. --- changes.d/645.fix.md | 2 + cylc/uiserver/app.py | 21 ++++++++ cylc/uiserver/resolvers.py | 56 ++++++++++++++++++-- cylc/uiserver/tests/test_resolvers.py | 73 ++++++++++++++++++++++++--- 4 files changed, 141 insertions(+), 11 deletions(-) create mode 100644 changes.d/645.fix.md diff --git a/changes.d/645.fix.md b/changes.d/645.fix.md new file mode 100644 index 00000000..0a9da840 --- /dev/null +++ b/changes.d/645.fix.md @@ -0,0 +1,2 @@ +Add a default timeout for the `cylc cat-log` command which is used to provide access to log files in the cylc-ui. +This timeout can be adjusted with the `log_timeout` option. diff --git a/cylc/uiserver/app.py b/cylc/uiserver/app.py index 3e41dc60..e9b075cc 100644 --- a/cylc/uiserver/app.py +++ b/cylc/uiserver/app.py @@ -339,6 +339,26 @@ class CylcUIServer(ExtensionApp): default_value=False, ) + log_timeout = Float( + # Note: This timeout it intended to clean up log streams that are no + # longer being actively monitored and prevent the associated "cat-log" + # processes from persisting in situations where they should not be + # (e.g. if the websocket connection unexpectedly closes) + config=True, + help=''' + The maximum length of time Cylc will stream a log file for in + seconds. + + The "Log" view in the Cylc GUI streams log files allowing you to + monitor the file while is grows. + + After the configured timeout, the stream will close. The log + view in the GUI will display a "reconnect" button allowing you + to restart the stream if desired. + ''', + default_value=(60 * 60 * 4), # 4 hours + ) + @validate('ui_build_dir') def _check_ui_build_dir_exists(self, proposed): if proposed['value'].exists(): @@ -407,6 +427,7 @@ def __init__(self, *args, **kwargs): # sub_status dictionary storing status of subscriptions self.sub_statuses = {} self.resolvers = Resolvers( + self, self.data_store_mgr, log=self.log, executor=self.executor, diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index 721ff3a4..328f1ba0 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -27,6 +27,7 @@ PIPE, Popen, ) +from time import time from typing import ( TYPE_CHECKING, Any, @@ -57,6 +58,7 @@ from cylc.flow.option_parsers import Options from graphql import ResolveInfo + from cylc.uiserver.app import CylcUIServer from cylc.uiserver.workflows_mgr import WorkflowsManager @@ -205,6 +207,9 @@ def _clean(workflow_ids, opts): class Services: """Cylc services provided by the UI Server.""" + # log file stream lag + CAT_LOG_SLEEP = 1 + @staticmethod def _error(message: Union[Exception, str]): """Format error case response.""" @@ -351,7 +356,7 @@ async def enqueue(stream, queue): await queue.put(line.decode()) @classmethod - async def cat_log(cls, id_: Tokens, log, info, file=None): + async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None): """Calls `cat log`. Used for log subscriptions. @@ -366,7 +371,7 @@ async def cat_log(cls, id_: Tokens, log, info, file=None): ] if file: cmd += ['-f', file] - log.info(f'$ {" ".join(cmd)}') + app.log.info(f'$ {" ".join(cmd)}') # For info, below subprocess is safe (uses shell=false by default) proc = await asyncio.subprocess.create_subprocess_exec( @@ -380,26 +385,51 @@ async def cat_log(cls, id_: Tokens, log, info, file=None): # This is to get around problem where stream is not EOF until # subprocess ends enqueue_task = asyncio.create_task(cls.enqueue(proc.stdout, queue)) + + # GraphQL operation ID op_id = info.root_value + + # track the number of lines received so far line_count = 0 + + # the time we started the cylc cat-loo process + start_time = time() + + # configured cat-log process timeout + timeout = float(app.log_timeout) + try: while info.context['sub_statuses'].get(op_id) != 'stop': + if time() - start_time > timeout: + # timeout exceeded -> kill the cat-log process + break + if queue.empty(): + # there are *no* lines to read from the cat-log process if buffer: + # yield everything in the buffer yield {'lines': list(buffer)} buffer.clear() + if proc.returncode is not None: + # process exited + # -> pass any stderr text to the client (_, stderr) = await proc.communicate() - # pass any error onto ui msg = process_cat_log_stderr(stderr) or ( f"cylc cat-log exited {proc.returncode}" ) yield {'error': msg} + + # stop reading log lines break + # sleep set at 1, which matches the `tail` default interval - await asyncio.sleep(1) + await asyncio.sleep(cls.CAT_LOG_SLEEP) + else: + # there *are* lines to read from the cat-log process if line_count > MAX_LINES: + # we have read beyond the line count yield {'lines': buffer} yield { 'error': ( @@ -408,25 +438,39 @@ async def cat_log(cls, id_: Tokens, log, info, file=None): ) } break + elif line_count == 0: + # this is the first line + # (this is a special line contains the file path) line_count += 1 yield { 'connected': True, 'path': (await queue.get())[2:].strip(), } continue + + # read in the log lines and add them to the buffer line = await queue.get() line_count += 1 buffer.append(line) if len(buffer) >= 75: yield {'lines': list(buffer)} buffer.clear() + # there is more text to read so don't sleep (but + # still "sleep(0)" to yield control to other + # coroutines) await asyncio.sleep(0) + finally: + # kill the cat-log process kill_process_tree(proc.pid) + + # terminate the queue enqueue_task.cancel() with suppress(asyncio.CancelledError): await enqueue_task + + # tell the client we have disconnected yield {'connected': False} @classmethod @@ -467,6 +511,7 @@ class Resolvers(BaseResolvers): def __init__( self, + app: 'CylcUIServer', data: 'DataStoreMgr', log: 'Logger', workflows_mgr: 'WorkflowsManager', @@ -474,6 +519,7 @@ def __init__( **kwargs ): super().__init__(data) + self.app = app self.log = log self.workflows_mgr = workflows_mgr self.executor = executor @@ -561,7 +607,7 @@ async def subscription_service( ): async for ret in Services.cat_log( ids[0], - self.log, + self.app, info, file ): diff --git a/cylc/uiserver/tests/test_resolvers.py b/cylc/uiserver/tests/test_resolvers.py index 10a3aa9d..49ed7326 100644 --- a/cylc/uiserver/tests/test_resolvers.py +++ b/cylc/uiserver/tests/test_resolvers.py @@ -1,11 +1,27 @@ +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + import asyncio -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple from async_timeout import timeout import logging import os import pytest from unittest.mock import MagicMock, Mock from subprocess import Popen, TimeoutExpired +from types import SimpleNamespace from cylc.flow import CYLC_LOG from cylc.flow.id import Tokens @@ -228,7 +244,23 @@ def wait(timeout): ] -async def test_cat_log(workflow_run_dir): +@pytest.fixture +def app(): + return SimpleNamespace( + log=logging.getLogger(CYLC_LOG), + log_timeout=10, + ) + + +@pytest.fixture +def fast_sleep(monkeypatch): + monkeypatch.setattr( + 'cylc.uiserver.resolvers.Services.CAT_LOG_SLEEP', + 0.1, + ) + + +async def test_cat_log(workflow_run_dir, app, fast_sleep): """This is a functional test for cat_log subscription resolver. It creates a log file and then runs the cat_log service. Checking it @@ -265,18 +297,17 @@ async def test_cat_log(workflow_run_dir): # mock the context info.context = {'sub_statuses': {2: "start"}} workflow = Tokens(id_) - log = logging.getLogger(CYLC_LOG) - # note - timeout tests that the cat-log process is being stopped correctly + # note - timeout tests that the cat-log process is being stopped correctly first_response = None async with timeout(20): - ret = services.cat_log(workflow, log, info) + ret = services.cat_log(workflow, app, info) actual = '' is_first = True async for response in ret: if err := response.get('error'): # Surface any unexpected errors for better visibility - log.exception(err) + app.log.exception(err) if is_first: first_response = response is_first = False @@ -298,6 +329,36 @@ async def test_cat_log(workflow_run_dir): assert actual.rstrip() == log_file_content.rstrip() +async def test_cat_log_timeout(workflow_run_dir, app, fast_sleep): + """This is a functional test for cat_log subscription resolver. + + It creates a log file and then runs the cat_log service. Checking it + returns all the logs. Note the log content should be over 20 lines to check + the buffer logic. + """ + (id_, log_dir) = workflow_run_dir + log_file = log_dir / '01-start-01.log' + log_file.write_text('forty two') + info = MagicMock() + info.root_value = 2 + # mock the context + info.context = {'sub_statuses': {2: "start"}} + workflow = Tokens(id_) + + app.log_timeout = 0 + + ret = services.cat_log(workflow, app, info) + responses = [] + async with timeout(5): + async for response in ret: + responses.append(response) + await asyncio.sleep(0) + + assert len(responses) == 1 + assert responses[0]['connected'] is False + assert 'error' not in responses[0] + + @pytest.mark.parametrize( 'text, expected', [ From b1371376ff1c17f8c4cce8b89d0e25dfb90de327 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 12 Nov 2024 11:26:29 +0000 Subject: [PATCH 2/2] Update cylc/uiserver/resolvers.py Co-authored-by: Tim Pillinger <26465611+wxtim@users.noreply.github.com> --- cylc/uiserver/resolvers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index 328f1ba0..1db37614 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -392,7 +392,7 @@ async def cat_log(cls, id_: Tokens, app: 'CylcUIServer', info, file=None): # track the number of lines received so far line_count = 0 - # the time we started the cylc cat-loo process + # the time we started the cylc cat-log process start_time = time() # configured cat-log process timeout