Skip to content

Commit

Permalink
protocol: introduce a custom protocol for cancellation WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
perrinjerome committed Mar 26, 2022
1 parent ef887cb commit 7d4958b
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 47 deletions.
4 changes: 4 additions & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning][semver].

### Fixed:

- fix cancellation of methods - methods were usually not cancelled when a
cancel notification was received.
- don't compute diagnostics for stale versions.

## [0.6.2] - 2022-01-06

### Fixed:
Expand Down
176 changes: 176 additions & 0 deletions server/buildoutls/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import asyncio
import collections
import logging
import time
from typing import Any, Deque, Set, Union

from pygls.exceptions import JsonRpcRequestCancelled
from pygls.lsp.methods import CANCEL_REQUEST
from pygls.lsp.types import (
JsonRpcMessage,
JsonRPCNotification,
JsonRPCRequestMessage,
JsonRPCResponseMessage,
)
from pygls.protocol import LanguageServerProtocol
from pygls.server import LanguageServer
from typing_extensions import TypeAlias

logger = logging.getLogger(__name__)

JsonRPCMessageId: TypeAlias = Union[int, str]


class CancelledJsonRPCRequestMessage(JsonRpcMessage):
"""A request that was cancelled"""
id: JsonRPCMessageId
method: str


JsonRPCMessage: TypeAlias = Union[JsonRPCNotification, JsonRPCRequestMessage,
JsonRPCResponseMessage]

# Type for items
CancellableQueueItem: TypeAlias = Union[JsonRPCMessage,
CancelledJsonRPCRequestMessage]


class CancellableQueue(asyncio.Queue[CancellableQueueItem]):
_queue: Deque[CancellableQueueItem]

# TODO: _early_cancellations might not be needed
_early_cancellations: Set[JsonRPCMessageId]

def cancel(self, msg_id: JsonRPCMessageId) -> None:
"""Mark a message in the queue as cancelled.
"""
for i, item in enumerate(self._queue):
if isinstance(item, JsonRPCRequestMessage) and item.id == msg_id:
self._queue[i] = CancelledJsonRPCRequestMessage(
jsonrpc=item.jsonrpc,
id=msg_id,
method=item.method,
)
break
else:
logger.debug(
'Received cancellation for request %s not found in the queue (queue len: %s, early cancel: %s)',
msg_id, len(self._queue), len(self._early_cancellations))
self._early_cancellations.add(msg_id)

def _init(self, maxsize: int) -> None:
self._queue = collections.deque()
self._early_cancellations = set()

def _get(self) -> CancellableQueueItem:
return self._queue.popleft()

def _put(self, item: CancellableQueueItem) -> None:
if isinstance(item, (JsonRPCRequestMessage, JsonRPCResponseMessage)):
msg_id = item.id
if msg_id in self._early_cancellations:
self._early_cancellations.remove(msg_id)
logger.debug("👍 not putting already cancelled %s", msg_id)
item = CancelledJsonRPCRequestMessage(
jsonrpc=item.jsonrpc,
id=msg_id,
method=getattr(item, 'method', '-'),
)
self._queue.append(item)


class CancellableQueueLanguageServerProtocol(LanguageServerProtocol):
"""Extension to pygls default LanguageServerProtocol with better support for
request cancelled by the client.
The general approach is that we use a queue of messages and a worker
coroutine to process the messages.
When a new request, notification or response message is received, instead
of processing it directly, we put it in the queue and keep reading next
messages.
If the notification is a cancel notification, we go through the queue to
mark the message as cancelled before it get processed.
"""
_server: LanguageServer
_diagnostic_queue: Any

def __init__(self, server: LanguageServer):
super().__init__(server) # type: ignore
self._job_queue = CancellableQueue()
self._worker_started = False

def _procedure_handler(
self,
message: Union[JsonRPCNotification, JsonRPCRequestMessage,
JsonRPCResponseMessage],
) -> None:
if not self._worker_started:
self._server.loop.create_task(self._worker())
self._worker_started = True

if isinstance(message,
JsonRPCNotification) and message.method == CANCEL_REQUEST:
self._job_queue.cancel(message.params.id)
self._job_queue.put_nowait(message)

async def _worker(self) -> None:
def job_desc(job: CancellableQueueItem) -> str:
job_id = getattr(job, 'id', '-')
job_method = getattr(job, 'method', '-')
return f'{job_id:>3} {job_method}'

while not self._server._stop_event.is_set():
# process protocol messages first
while True:
await asyncio.sleep(0.1)
start = time.perf_counter_ns()
try:
# TODO: we don't need wait for here ?
job = await asyncio.wait_for(self._job_queue.get(), 0.3)
except asyncio.TimeoutError:
# logger.info("timeout getting a job in in %0.4f", (time.perf_counter_ns() - start) / 1e9)
break
logger.info(
"got %s in in %0.4f %s",
"cancelled 👍 job"
if isinstance(job, CancelledJsonRPCRequestMessage) else "job",
(time.perf_counter_ns() - start) / 1e9,
job_desc(job),
)

start = time.perf_counter_ns()
if isinstance(job, CancelledJsonRPCRequestMessage):
# according to https://microsoft.github.io/language-server-protocol/specifications/specification-current/#cancelRequest
# A request that got canceled still needs to return from the server
# and send a response back. It can not be left open / hanging. This
# is in line with the JSON RPC protocol that requires that every
# request sends a response back. In addition it allows for returning
# partial results on cancel. If the request returns an error response
# on cancellation it is advised to set the error code to
# ErrorCodes.RequestCancelled.
self._send_response( # type: ignore
job.id,
result=None,
error=JsonRpcRequestCancelled(), # type: ignore
)
else:
super()._procedure_handler(job) # type: ignore
self._job_queue.task_done()

logger.debug("done in %0.4f %s",
(time.perf_counter_ns() - start) / 1e9, job_desc(job))

# once all protocol messages are done, process the tasks from notifications
while True:
start = time.perf_counter_ns()
try:
uri = self._diagnostic_queue.get_nowait()
except asyncio.QueueEmpty:
break

# TODO
d = await self._server.do_diagnostic(uri) # type: ignore
self._diagnostic_queue.task_done()
logger.debug("🔖(%d) done in %0.4f %s", d,
(time.perf_counter_ns() - start) / 1e9, uri)
68 changes: 53 additions & 15 deletions server/buildoutls/server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import itertools
import logging
import os
import pathlib
import re
import urllib.parse
from typing import Iterable, List, Optional, Tuple, Union
from typing import Iterable, List, Optional, Set, Tuple, Type, Union

from pygls.lsp.methods import (
CODE_ACTION,
Expand Down Expand Up @@ -46,6 +47,7 @@
TextEdit,
)
from pygls.lsp.types.window import ShowDocumentParams
from pygls.protocol import LanguageServerProtocol
from pygls.server import LanguageServer
from pygls.workspace import Document

Expand All @@ -56,12 +58,54 @@
diagnostic,
md5sum,
profiling,
protocol,
recipes,
types,
utils,
)

server = LanguageServer()

class DiagnosticQueue(asyncio.Queue[str]):
_queue: Set[str]

def _init(self, maxsize: int) -> None:
self._queue = set()

def _get(self) -> str:
return self._queue.pop()

def _put(self, item: str) -> None:
self._queue.add(item)


class BuildoutLanguageServer(LanguageServer):
lsp: protocol.CancellableQueueLanguageServerProtocol

def __init__(
self,
loop: Optional[asyncio.AbstractEventLoop] = None,
protocol_cls: Type[LanguageServerProtocol] = protocol.
CancellableQueueLanguageServerProtocol,
max_workers: int = 2,
):
super().__init__(
loop=loop,
protocol_cls=protocol_cls,
max_workers=max_workers,
)
self.lsp._diagnostic_queue = DiagnosticQueue()

async def schedule_diagnostic(self, uri: str) -> None:
await self.lsp._diagnostic_queue.put(uri)

async def do_diagnostic(self, uri: str) -> None:
diagnostics = []
async for diag in diagnostic.getDiagnostics(self, uri):
diagnostics.append(diag)
self.publish_diagnostics(uri, diagnostics)
return len(diagnostics) # type: ignore


server = BuildoutLanguageServer()

server.command(commands.COMMAND_START_PROFILING)(profiling.start_profiling)
server.command(commands.COMMAND_STOP_PROFILING)(profiling.stop_profiling)
Expand All @@ -82,7 +126,6 @@ def getOptionValue(
return option.value


@utils.singleton_task
async def parseAndSendDiagnostics(
ls: LanguageServer,
uri: str,
Expand Down Expand Up @@ -113,14 +156,14 @@ async def command_update_md5sum(
await md5sum.update_md5sum(ls, args[0])



@server.feature(
CODE_ACTION,
CodeActionOptions(resolve_provider=False,
code_action_kinds=[
CodeActionKind.QuickFix,
]),
)
@utils.singleton_task
async def lsp_code_action(
ls: LanguageServer,
params: CodeActionParams) -> Optional[List[Union[Command, CodeAction]]]:
Expand All @@ -129,19 +172,20 @@ async def lsp_code_action(

@server.feature(TEXT_DOCUMENT_DID_OPEN)
async def did_open(
ls: LanguageServer,
ls: BuildoutLanguageServer,
params: DidOpenTextDocumentParams,
) -> None:
await parseAndSendDiagnostics(ls, params.text_document.uri)
await ls.schedule_diagnostic(params.text_document.uri)
# await parseAndSendDiagnostics(ls, params.text_document.uri)


@server.feature(TEXT_DOCUMENT_DID_CHANGE)
async def did_change(
ls: LanguageServer,
ls: BuildoutLanguageServer,
params: DidChangeTextDocumentParams,
) -> None:
buildout.clearCache(params.text_document.uri)
await parseAndSendDiagnostics(ls, params.text_document.uri)
await ls.schedule_diagnostic(params.text_document.uri)


@server.feature(WORKSPACE_DID_CHANGE_WATCHED_FILES)
Expand All @@ -154,7 +198,6 @@ async def did_change_watched_file(


@server.feature(DOCUMENT_SYMBOL)
@utils.singleton_task
async def lsp_symbols(
ls: LanguageServer,
params: DocumentSymbolParams,
Expand Down Expand Up @@ -216,7 +259,6 @@ async def lsp_symbols(


@server.feature(COMPLETION, CompletionOptions(trigger_characters=["{", ":"]))
@utils.singleton_task
async def lsp_completion(
ls: LanguageServer,
params: CompletionParams,
Expand Down Expand Up @@ -544,7 +586,6 @@ def getDefaultTextEdit(


@server.feature(DEFINITION)
@utils.singleton_task
async def lsp_definition(
ls: LanguageServer,
params: TextDocumentPositionParams,
Expand Down Expand Up @@ -587,7 +628,6 @@ async def lsp_definition(


@server.feature(REFERENCES)
@utils.singleton_task
async def lsp_references(
server: LanguageServer,
params: TextDocumentPositionParams,
Expand Down Expand Up @@ -649,7 +689,6 @@ async def lsp_references(


@server.feature(HOVER)
@utils.singleton_task
async def lsp_hover(
ls: LanguageServer,
params: TextDocumentPositionParams,
Expand All @@ -673,7 +712,6 @@ async def lsp_hover(


@server.feature(DOCUMENT_LINK)
@utils.singleton_task
async def lsp_document_link(
ls: LanguageServer,
params: DocumentLinkParams,
Expand Down
Loading

0 comments on commit 7d4958b

Please sign in to comment.