diff --git a/server/CHANGELOG.md b/server/CHANGELOG.md index a67f054..8c8b3c8 100644 --- a/server/CHANGELOG.md +++ b/server/CHANGELOG.md @@ -51,6 +51,8 @@ and this project adheres to [Semantic Versioning][semver]. - significantly improve performance by implementing .copy() method and using an intermediate cache. - fix crash when completing option made only of space - fix crash when completing after ${} +- fix cancellation of methods - methods were usually not cancelled when a cancel notification was received. +- don't compute diagnostics for stale versions. ## [0.6.2] - 2022-01-06 diff --git a/server/buildoutls/protocol.py b/server/buildoutls/protocol.py new file mode 100644 index 0000000..a51835f --- /dev/null +++ b/server/buildoutls/protocol.py @@ -0,0 +1,181 @@ +import asyncio +import collections +import logging +import time +from typing import TYPE_CHECKING, Any, Deque, Set, Union + +from pygls.exceptions import JsonRpcRequestCancelled +from pygls.lsp.methods import CANCEL_REQUEST +from pygls.lsp.types import ( + JsonRpcMessage, + JsonRPCNotification, + JsonRPCRequestMessage, + JsonRPCResponseMessage, +) +from pygls.protocol import LanguageServerProtocol +from pygls.server import LanguageServer +from typing_extensions import TypeAlias + +logger = logging.getLogger(__name__) + +JsonRPCMessageId: TypeAlias = Union[int, str] + + +class CancelledJsonRPCRequestMessage(JsonRpcMessage): + """A request that was cancelled""" + id: JsonRPCMessageId + method: str + + +JsonRPCMessage: TypeAlias = Union[JsonRPCNotification, JsonRPCRequestMessage, + JsonRPCResponseMessage] + +# Type for items +CancellableQueueItem: TypeAlias = Union[JsonRPCMessage, + CancelledJsonRPCRequestMessage] + + +if TYPE_CHECKING: + BaseQueue: TypeAlias = asyncio.Queue[CancellableQueueItem] +else: + BaseQueue = asyncio.Queue + +class CancellableQueue(BaseQueue): + _queue: Deque[CancellableQueueItem] + + # TODO: _early_cancellations might not be needed + _early_cancellations: Set[JsonRPCMessageId] + + def cancel(self, msg_id: JsonRPCMessageId) -> None: + """Mark a message in the queue as cancelled. + """ + for i, item in enumerate(self._queue): + if isinstance(item, JsonRPCRequestMessage) and item.id == msg_id: + self._queue[i] = CancelledJsonRPCRequestMessage( + jsonrpc=item.jsonrpc, + id=msg_id, + method=item.method, + ) + break + else: + logger.debug( + 'Received cancellation for request %s not found in the queue (queue len: %s, early cancel: %s)', + msg_id, len(self._queue), len(self._early_cancellations)) + self._early_cancellations.add(msg_id) + + def _init(self, maxsize: int) -> None: + self._queue = collections.deque() + self._early_cancellations = set() + + def _get(self) -> CancellableQueueItem: + return self._queue.popleft() + + def _put(self, item: CancellableQueueItem) -> None: + if isinstance(item, (JsonRPCRequestMessage, JsonRPCResponseMessage)): + msg_id = item.id + if msg_id in self._early_cancellations: + self._early_cancellations.remove(msg_id) + logger.debug("👍 not putting already cancelled %s", msg_id) + item = CancelledJsonRPCRequestMessage( + jsonrpc=item.jsonrpc, + id=msg_id, + method=getattr(item, 'method', '-'), + ) + self._queue.append(item) + + +class CancellableQueueLanguageServerProtocol(LanguageServerProtocol): + """Extension to pygls default LanguageServerProtocol with better support for + request cancelled by the client. + + The general approach is that we use a queue of messages and a worker + coroutine to process the messages. + When a new request, notification or response message is received, instead + of processing it directly, we put it in the queue and keep reading next + messages. + + If the notification is a cancel notification, we go through the queue to + mark the message as cancelled before it get processed. + """ + _server: LanguageServer + _diagnostic_queue: Any + + def __init__(self, server: LanguageServer): + super().__init__(server) # type: ignore + self._job_queue = CancellableQueue() + self._worker_started = False + + def _procedure_handler( + self, + message: Union[JsonRPCNotification, JsonRPCRequestMessage, + JsonRPCResponseMessage], + ) -> None: + if not self._worker_started: + self._server.loop.create_task(self._worker()) + self._worker_started = True + + if isinstance(message, + JsonRPCNotification) and message.method == CANCEL_REQUEST: + self._job_queue.cancel(message.params.id) + self._job_queue.put_nowait(message) + + async def _worker(self) -> None: + def job_desc(job: CancellableQueueItem) -> str: + job_id = getattr(job, 'id', '-') + job_method = getattr(job, 'method', '-') + return f'{job_id:>3} {job_method}' + + while not self._server._stop_event.is_set(): + # process protocol messages first + while True: + await asyncio.sleep(0.1) + start = time.perf_counter_ns() + try: + # TODO: we don't need wait for here ? + job = await asyncio.wait_for(self._job_queue.get(), 0.3) + except asyncio.TimeoutError: + # logger.info("timeout getting a job in in %0.4f", (time.perf_counter_ns() - start) / 1e9) + break + logger.info( + "got %s in in %0.4f %s", + "cancelled 👍 job" + if isinstance(job, CancelledJsonRPCRequestMessage) else "job", + (time.perf_counter_ns() - start) / 1e9, + job_desc(job), + ) + + start = time.perf_counter_ns() + if isinstance(job, CancelledJsonRPCRequestMessage): + # according to https://microsoft.github.io/language-server-protocol/specifications/specification-current/#cancelRequest + # A request that got canceled still needs to return from the server + # and send a response back. It can not be left open / hanging. This + # is in line with the JSON RPC protocol that requires that every + # request sends a response back. In addition it allows for returning + # partial results on cancel. If the request returns an error response + # on cancellation it is advised to set the error code to + # ErrorCodes.RequestCancelled. + self._send_response( # type: ignore + job.id, + result=None, + error=JsonRpcRequestCancelled(), # type: ignore + ) + else: + super()._procedure_handler(job) # type: ignore + self._job_queue.task_done() + + logger.debug("done in %0.4f %s", + (time.perf_counter_ns() - start) / 1e9, job_desc(job)) + + # once all protocol messages are done, process the tasks from notifications + while True: + start = time.perf_counter_ns() + try: + uri = self._diagnostic_queue.get_nowait() + except asyncio.QueueEmpty: + break + + # TODO + d = await self._server.do_diagnostic(uri) # type: ignore + self._diagnostic_queue.task_done() + logger.debug("🔖(%d) done in %0.4f %s", d, + (time.perf_counter_ns() - start) / 1e9, uri) diff --git a/server/buildoutls/server.py b/server/buildoutls/server.py index b2f6902..0f78f02 100644 --- a/server/buildoutls/server.py +++ b/server/buildoutls/server.py @@ -1,10 +1,11 @@ +import asyncio import itertools import logging import os import pathlib import re import urllib.parse -from typing import Iterable, List, Optional, Tuple, Union +from typing import Iterable, List, Optional, Set, Tuple, Type, Union from pygls.lsp.methods import ( CODE_ACTION, @@ -47,6 +48,7 @@ TextEdit, ) from pygls.lsp.types.window import ShowDocumentParams +from pygls.protocol import LanguageServerProtocol from pygls.server import LanguageServer from pygls.workspace import Document @@ -57,11 +59,54 @@ diagnostic, md5sum, profiling, + protocol, recipes, types, ) -server = LanguageServer() + +class DiagnosticQueue(asyncio.Queue[str]): + _queue: Set[str] + + def _init(self, maxsize: int) -> None: + self._queue = set() + + def _get(self) -> str: + return self._queue.pop() + + def _put(self, item: str) -> None: + self._queue.add(item) + + +class BuildoutLanguageServer(LanguageServer): + lsp: protocol.CancellableQueueLanguageServerProtocol + + def __init__( + self, + loop: Optional[asyncio.AbstractEventLoop] = None, + protocol_cls: Type[LanguageServerProtocol] = protocol. + CancellableQueueLanguageServerProtocol, + max_workers: int = 2, + ): + super().__init__( + loop=loop, + protocol_cls=protocol_cls, + max_workers=max_workers, + ) + self.lsp._diagnostic_queue = DiagnosticQueue() + + async def schedule_diagnostic(self, uri: str) -> None: + await self.lsp._diagnostic_queue.put(uri) + + async def do_diagnostic(self, uri: str) -> None: + diagnostics = [] + async for diag in diagnostic.getDiagnostics(self, uri): + diagnostics.append(diag) + self.publish_diagnostics(uri, diagnostics) + return len(diagnostics) # type: ignore + + +server = BuildoutLanguageServer() server.command(commands.COMMAND_START_PROFILING)(profiling.start_profiling) server.command(commands.COMMAND_STOP_PROFILING)(profiling.stop_profiling) @@ -127,19 +172,20 @@ async def lsp_code_action( @server.feature(TEXT_DOCUMENT_DID_OPEN) async def did_open( - ls: LanguageServer, + ls: BuildoutLanguageServer, params: DidOpenTextDocumentParams, ) -> None: - await parseAndSendDiagnostics(ls, params.text_document.uri) + await ls.schedule_diagnostic(params.text_document.uri) + # await parseAndSendDiagnostics(ls, params.text_document.uri) @server.feature(TEXT_DOCUMENT_DID_CHANGE) async def did_change( - ls: LanguageServer, + ls: BuildoutLanguageServer, params: DidChangeTextDocumentParams, ) -> None: buildout.clearCache(params.text_document.uri) - await parseAndSendDiagnostics(ls, params.text_document.uri) + await ls.schedule_diagnostic(params.text_document.uri) @server.feature(WORKSPACE_DID_CHANGE_WATCHED_FILES) diff --git a/server/buildoutls/tests/test_protocol.py b/server/buildoutls/tests/test_protocol.py new file mode 100644 index 0000000..a9b4653 --- /dev/null +++ b/server/buildoutls/tests/test_protocol.py @@ -0,0 +1,153 @@ +import asyncio +import os +import pathlib +import threading + +import unittest.mock +import pygls.protocol +import pygls.server +import pytest + +from pygls.lsp.methods import (CANCEL_REQUEST, COMPLETION, INITIALIZE, + TEXT_DOCUMENT_DID_OPEN) +from pygls.lsp.types import ClientCapabilities, InitializeParams +from pygls.lsp.types.basic_structures import (Position, TextDocumentIdentifier, + TextDocumentItem) +from pygls.lsp.types.language_features.completion import CompletionParams +from pygls.lsp.types.workspace import DidOpenTextDocumentParams + +from buildoutls.server import server + +CALL_TIMEOUT = 3 + +root_path = pathlib.Path( + os.path.abspath( + os.path.join( + os.path.dirname(__file__), + '..', + '..', + '..', + 'profiles', + ))) + + +class ClientServer: + def __init__(self): + # Client to Server pipe + csr, csw = os.pipe() + # Server to client pipe + scr, scw = os.pipe() + + self._read_fds = ( + csr, + scr, + ) + + # Setup Server: use the initialized server with methods registered, + # but give it a new loop because this loop will be closed at server + # shutdown. + self.server = server + self.server.loop = asyncio.new_event_loop() + self.server_thread = threading.Thread( + target=self.server.start_io, + args=(os.fdopen(csr, 'rb'), os.fdopen(scw, 'wb')), + name="Server", + ) + self.server_thread.daemon = True + + # Setup client + self.client = pygls.server.LanguageServer(asyncio.new_event_loop()) + self.client_thread = threading.Thread( + target=self.client.start_io, + args=(os.fdopen(scr, 'rb'), os.fdopen(csw, 'wb')), + name="Client", + ) + self.client_thread.daemon = True + + def start(self): + self.server_thread.start() + self.server.thread_id = self.server_thread.ident + + self.client_thread.start() + + self.initialize() + + def stop(self): + # XXX setting a shutdown message does not seem to stop properly, + # for now we close the file input fds + for fd in self._read_fds: + os.close(fd) + self.server_thread.join() + self.client_thread.join() + + def initialize(self): + response = self.client.lsp.send_request( + INITIALIZE, + InitializeParams( + process_id=os.getpid(), + root_uri=root_path.as_uri(), + capabilities=ClientCapabilities())).result(timeout=CALL_TIMEOUT) + + assert 'capabilities' in response + + def __iter__(self): + yield self.client + yield self.server + + +@pytest.fixture +def client_server(): + """ A fixture to setup a client/server """ + client_server = ClientServer() + client_server.start() + client, server = client_server + yield client, server + client_server.stop() + + +def test_completion_cancellation(client_server): + client, _ = client_server + + fname = root_path / 'buildout.cfg' + uri = fname.as_uri() + language_id = 'zc-buildout' + version = 1 + with open(fname) as f: + text = f.read() + + client.lsp.notify( + method=TEXT_DOCUMENT_DID_OPEN, + params=DidOpenTextDocumentParams(text_document=TextDocumentItem( + uri=uri, + language_id=language_id, + version=version, + text=text, + ))) + with unittest.mock.patch('pygls.protocol.uuid.uuid4', + return_value='first-completion'): + req1 = client.lsp.send_request( + method=COMPLETION, + params=CompletionParams( + position=Position(line=14, character=42), + text_document=TextDocumentIdentifier(uri=uri), + )) + + client.lsp.notify( + method=CANCEL_REQUEST, + params=dict(id='first-completion'), + ) + + req2 = client.lsp.send_request( + method=COMPLETION, + params=CompletionParams( + position=Position(line=14, character=42), + text_document=TextDocumentIdentifier(uri=uri), + )) + + with pytest.raises( + pygls.exceptions.JsonRpcRequestCancelled, + match="Request Cancelled", + ): + assert req1.result(timeout=CALL_TIMEOUT) + + assert req2.result(timeout=CALL_TIMEOUT)