Skip to content

Commit

Permalink
Progress. Improves stability. Switch to chronos recommend way to thre…
Browse files Browse the repository at this point in the history
…ad in lsTransport
  • Loading branch information
jmgomez committed Aug 25, 2024
1 parent 336604d commit 8533722
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 68 deletions.
49 changes: 32 additions & 17 deletions ls.nim
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,19 @@ proc parseWorkspaceConfiguration*(conf: JsonNode): NlsConfig =
debug "Failed to parse the configuration.", error = getCurrentExceptionMsg()
result = NlsConfig()


proc getWorkspaceConfiguration*(ls: LanguageServer): Future[NlsConfig] {.async: (raises:[]).} =
try:
let conf = await ls.workspaceConfiguration
return parseWorkspaceConfiguration(conf)
except CatchableError:
discard
#this is the root of a lot a problems as there are multiple race conditions here.
#since most request doenst really rely on the configuration, we can just go ahead and
#return a default one until we have the right one.
#TODO review and handle project specific confs when received instead of reliying in this func
if ls.workspaceConfiguration.finished:
return parseWorkspaceConfiguration(ls.workspaceConfiguration.read)
else:
return NlsConfig()
except CatchableError as ex:
writeStackTrace(ex)

proc showMessage*(ls: LanguageServer, message: string, typ: MessageType) {.raises: [].} =
try:
Expand Down Expand Up @@ -605,13 +612,21 @@ proc createOrRestartNimsuggest*(ls: LanguageServer, projectFile: string, uri = "
discard

proc getNimsuggest*(ls: LanguageServer, uri: string): Future[Nimsuggest] {.async.} =
assert uri in ls.openFiles, "File not open"

let projectFile = await ls.openFiles[uri].projectFile
if not ls.projectFiles.hasKey(projectFile):
ls.createOrRestartNimsuggest(projectFile, uri)

ls.lastNimsuggest = ls.projectFiles[projectFile]
return await ls.projectFiles[projectFile]

proc tryGetNimsuggest*(ls: LanguageServer, uri: string): Future[Option[Nimsuggest]] {.async.} =
if uri notin ls.openFiles:
none(NimSuggest)
else:
some await getNimsuggest(ls, uri)

proc restartAllNimsuggestInstances(ls: LanguageServer) =
debug "Restarting all nimsuggest instances"
for projectFile in ls.projectFiles.keys:
Expand Down Expand Up @@ -698,19 +713,19 @@ proc getProjectFile*(fileUri: string, ls: LanguageServer): Future[string] {.asyn
else:
trace "getProjectFile does not match", uri = fileUri, matchedRegex = mapping.fileRegex

once: #once we refactor the project to chronos, we may move this code into init. Right now it hangs for some odd reason
let rootPath = ls.initializeParams.getRootPath
if rootPath != "":
let nimbleFiles = walkFiles(rootPath / "*.nimble").toSeq
if nimbleFiles.len > 0:
let nimbleFile = nimbleFiles[0]
let nimbleDumpInfo = ls.getNimbleDumpInfo(nimbleFile)
ls.entryPoints = nimbleDumpInfo.getNimbleEntryPoints(ls.initializeParams.getRootPath)
# ls.showMessage(fmt "Found entry point {ls.entryPoints}?", MessageType.Info)
for entryPoint in ls.entryPoints:
debug "Starting nimsuggest for entry point ", entry = entryPoint
if entryPoint notin ls.projectFiles:
ls.createOrRestartNimsuggest(entryPoint)
# once: #once we refactor the project to chronos, we may move this code into init. Right now it hangs for some odd reason
# let rootPath = ls.initializeParams.getRootPath
# if rootPath != "":
# let nimbleFiles = walkFiles(rootPath / "*.nimble").toSeq
# if nimbleFiles.len > 0:
# let nimbleFile = nimbleFiles[0]
# let nimbleDumpInfo = ls.getNimbleDumpInfo(nimbleFile)
# ls.entryPoints = nimbleDumpInfo.getNimbleEntryPoints(ls.initializeParams.getRootPath)
# # ls.showMessage(fmt "Found entry point {ls.entryPoints}?", MessageType.Info)
# for entryPoint in ls.entryPoints:
# debug "Starting nimsuggest for entry point ", entry = entryPoint
# if entryPoint notin ls.projectFiles:
# ls.createOrRestartNimsuggest(entryPoint)

result = ls.getProjectFileAutoGuess(fileUri)
if result in ls.projectFiles:
Expand Down
97 changes: 62 additions & 35 deletions lstransports.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import
oids,
]
import ls, routes, suggestapi, protocol/enums, utils
import protocol/types
import protocol/types, chronos/threadsync

type LspClientResponse* = object
jsonrpc*: JsonRPC2
Expand Down Expand Up @@ -76,10 +76,12 @@ proc addRpcToCancellable*(ls: LanguageServer, rpc: Rpc): Rpc =
let idRequest = get[int](params, "idRequest")
ls.cancelableRequests[idRequest] = fut
return fut
except KeyError:
except KeyError as ex:
error "IdRequest not found in the request params"
except Exception:
writeStackTrace(ex)
except Exception as ex:
error "Error adding request to cancellable requests"
writeStackTrace(ex)


proc processContentLength*(inputStream: FileStream): string =
Expand All @@ -94,7 +96,7 @@ proc processContentLength*(inputStream: FileStream): string =
else:
error "No content length \n"

proc processContentLength*(transport: StreamTransport): Future[string] {.async:(raises:[]).} =
proc processContentLength*(transport: StreamTransport): Future[string] {.async:(raises:[]).} =
try:
result = await transport.readLine()
if result.startsWith(CONTENT_LENGTH):
Expand All @@ -108,23 +110,30 @@ proc processContentLength*(transport: StreamTransport): Future[string] {.async:(
except TransportError as ex:
error "Error reading content length", msg = ex.msg
except CatchableError as ex:
error "Error reading content length", msg = ex.msg
error "Error reading content length", msg = ex.msg


proc readStdin*(transport: StreamTransport) {.thread.} =
var inputStream = newFileStream(stdin)
var value = processContentLength(inputStream)
discard waitFor transport.write(value & CRLF)

readStdin(transport)
proc readStdin*(transport: StreamTransport) {.thread.} =
while true:
let inputStream = newFileStream(stdin)
var value = processContentLength(inputStream)
try:
discard waitFor transport.write(value & CRLF)
except TransportError as ex:
error "Error reading stdin", msg = ex.msg

proc writeStackTrace*(ex = getCurrentException()) =
try:
stderr.write ex.getStackTrace()
except IOError:
discard
type
ReadStdinContext = object
signal: ThreadSignalPtr
value: string

proc writeOutput*(ls: LanguageServer, content: JsonNode) =
proc readStdin2*(ctx: ptr ReadStdinContext) {.thread.} =
let inputStream = newFileStream(stdin)
while true:
ctx.value = processContentLength(inputStream) & CRLF
discard ctx.signal.fireSync()

proc writeOutput*(ls: LanguageServer, content: JsonNode) =
let responseStr = $content
let contentLenght = responseStr.len + 1
let res = &"{CONTENT_LENGTH}{contentLenght}{CRLF}{CRLF}{responseStr}\n"
Expand All @@ -138,7 +147,7 @@ proc writeOutput*(ls: LanguageServer, content: JsonNode) =
except CatchableError as ex:
error "Error writing output", msg = ex.msg

proc runRpc(ls: LanguageServer, req: RequestRx, rpc: RpcProc): Future[void] {.async.} =
proc runRpc(ls: LanguageServer, req: RequestRx, rpc: RpcProc): Future[void] {.async.} =
try:
let res = await rpc(req.params)
if res.string in ["", "{}"]:
Expand All @@ -149,11 +158,13 @@ proc runRpc(ls: LanguageServer, req: RequestRx, rpc: RpcProc): Future[void] {.as
json["id"] = %*req.id.num
json["result"] = parseJson(res.string)
ls.writeOutput(json)
except CancelledError as ex:
debug "[RunRPC]Request cancelled", meth = req.meth
except CatchableError as ex:
error "[RunRPC] ", msg = ex.msg
writeStackTrace(ex = ex)

proc processMessage(ls: LanguageServer, message: string) {.raises:[].} =
proc processMessage(ls: LanguageServer, message: string) {.raises:[].} =
try:
let contentJson = parseJson(message) #OPT oportunity reuse the same JSON already parsed
let isReq = "method" in contentJson
Expand All @@ -172,18 +183,24 @@ proc processMessage(ls: LanguageServer, message: string) {.raises:[].} =
else: #Response
let response = JrpcSys.decode(message, LspClientResponse)
let id = response.id
if id notin ls.responseMap:
error "Id not found in responseMap", id = id #TODO we should store the call name we are trying to responde to here

if response.result == nil:
ls.responseMap[id].complete(newJObject())
ls.responseMap.del id
else:
let r = response.result
ls.responseMap[id].complete(r)
ls.responseMap.del id
except JsonParsingError as ex:
error "[Processsing Message] Error parsing message", message = message
writeStackTrace(ex)
except CatchableError as ex:
error "[Processsing Message] "
writeStackTrace(ex)

except CatchableError:
error "[Processsing Message] ", msg = getCurrentExceptionMsg(), trace = getStackTrace()

proc initActions*(ls: LanguageServer,) =
proc initActions*(ls: LanguageServer,) =
let onExit: OnExitCallback = proc() {.async.} =
case ls.transportMode:
of stdio:
Expand All @@ -192,8 +209,8 @@ proc initActions*(ls: LanguageServer,) =
ls.outStream.close()
of socket:
ls.srv.stop() #TODO check if stop also close the transport, which it should
template genJsonAction() {.dirty.} =

template genJsonAction() {.dirty.} =
var json = newJObject()
json["jsonrpc"] = %*"2.0"
json["method"] = %*name
Expand All @@ -210,16 +227,21 @@ proc initActions*(ls: LanguageServer,) =
ls.writeOutput(json)
result = newFuture[JsonNode]()
#We store the future in the responseMap so we can complete it in processMessage
ls.responseMap[id] = result
ls.responseMap[id] = result

ls.call = callAction
ls.notify = notifyAction
ls.onExit = onExit


var context {.global.} = createShared(ReadStdinContext)

#start and loop functions belows are the only difference between transports
proc startStdioLoop*(ls: LanguageServer): Future[void] {.async.} =
while true:
let msg = await ls.rTranspStdin.readLine(sep = CRLF)
# let msg = await ls.rTranspStdin.readLine(sep = CRLF)
await context.signal.wait()
let msg = context.value
debug "[Stdio Transport] Processing Message", msg = msg
ls.processMessage(msg)

Expand All @@ -233,10 +255,15 @@ proc startStdioServer*(ls: LanguageServer) =
ls.wTranspStdin = fromPipe(wfd)
ls.srv = newRpcSocketServer()
ls.initActions()
var stdioThread {.global.}: Thread[StreamTransport]
createThread(stdioThread, readStdin, ls.wTranspStdin)
# var stdioThread {.global.}: Thread[StreamTransport]
var stdinThread {.global.}: Thread[ptr ReadStdinContext]
var signal = ThreadSignalPtr.new().expect("asas")
context.signal = signal
# createThread(stdioThread, readStdin, ls.wTranspStdin)
createThread(stdinThread, readStdin2, context)
asyncSpawn ls.startStdioLoop()


proc processClientLoop*(ls: LanguageServer, server: StreamServer, transport: StreamTransport) {.async: (raises: []), gcsafe.} =
ls.socketTransport = transport
while true:
Expand All @@ -247,10 +274,10 @@ proc processClientLoop*(ls: LanguageServer, server: StreamServer, transport: Str
break
debug "[Socket Transport] Processing message ", address = transport.remoteAddress()
ls.processMessage(msg)
proc startSocketServer*(ls: LanguageServer, port: Port) =
ls.srv = newRpcSocketServer(partial(processClientLoop, ls))

proc startSocketServer*(ls: LanguageServer, port: Port) =
ls.srv = newRpcSocketServer(partial(processClientLoop, ls))
ls.initActions()
ls.srv.addStreamServer("localhost", port)
ls.srv.start()
ls.srv.addStreamServer("localhost", port)
ls.srv.start

4 changes: 2 additions & 2 deletions nimlangserver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ proc main() =
let cmdLineParams = handleParams()
let storageDir = ensureStorageDir()
debug "Starting nimlangserver", params = commandLineParams()
#TODO handle transport in handleParams
#TODO properly handle transport in handleParams
let transportMode = parseEnum[TransportMode](
commandLineParams().filterIt("stdio" in it).head.map(it => it.replace("--", "")).get("socket")
commandLineParams().filterIt("stdio" in it or "socket" in it).head.map(it => it.replace("--", "")).get("stdio")
)
debug "Transport mode is ", transportMode = transportMode
#[
Expand Down
Loading

0 comments on commit 8533722

Please sign in to comment.