diff --git a/CHANGELOG.md b/CHANGELOG.md index 0628270e7..f299e84d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - implement jump target selector and jump to references ([#739]) - implement settings UI using native JupyterLab 3.3 UI ([#778]) - add option to show hover tooltip automatically ([#864], thanks @yamaton) + - add support for language servers that can (only) communicate through TCP rather than stdio (there is no support yet for servers already running on another machine and/or port) [(#636)] - bug fixes: - use correct websocket URL if configured as different from base URL ([#820], thanks @MikeSem) - clean up all completer styles when completer feature is disabled ([#829]). @@ -61,6 +62,7 @@ [#860]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/860 [#864]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/864 [#882]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/882 +[#636]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/636 ### `@krassowski/jupyterlab-lsp 3.10.1` (2022-03-21) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e46e0d86a..c7598172b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -388,8 +388,11 @@ otherwise an empty dictionary (`{}`) should be returned. ##### Common Concerns - some language servers need to have their connection mode specified - - the `stdio` interface is the only one supported by `jupyter_lsp` - - PRs welcome to support other modes! + - `jupyter_lsp` currently supports the `stdio` and `tcp` interface + - the mode used by `jupyter_lsp` to connect to the language server can be specified by including `mode="stdio"` or `mode="tcp"` in the language server `spec`-dictionary + - currently it is not possible to connect to externally running language servers via tcp, but only to servers spawned by `jupyter_lsp` as given by the `argv` specs entry + - PRs welcome to support externally running language servers! + - use the placeholder `{port}` within the `argv` entry to allow `jupyter_lsp` to specify the port on which to launch the language server - because of its VSCode heritage, many language servers use `nodejs` - `LanguageServerManager.nodejs` will provide the location of our best guess at where a user's `nodejs` might be found diff --git a/atest/ports.py b/atest/ports.py index 439227b6d..dac9307ed 100644 --- a/atest/ports.py +++ b/atest/ports.py @@ -9,7 +9,7 @@ def get_unused_port(): Probably could introduce race conditions if inside a tight loop. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("localhost", 0)) + sock.bind(("127.0.0.1", 0)) sock.listen(1) port = sock.getsockname()[1] sock.close() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py new file mode 100644 index 000000000..d4e796dc2 --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -0,0 +1,169 @@ +""" Language Server readers and writers + +Parts of this code are derived from: + +> https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/pyls_jsonrpc/streams.py#L83 # noqa +> https://github.com/palantir/python-jsonrpc-server/blob/45ed1931e4b2e5100cc61b3992c16d6f68af2e80/pyls_jsonrpc/streams.py # noqa +> > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE +> > Copyright 2018 Palantir Technologies, Inc. +""" +from abc import ABC, ABCMeta, abstractmethod +from typing import Text + +# pylint: disable=broad-except +import anyio +from anyio.streams.buffered import BufferedByteReceiveStream +from anyio.streams.text import TextSendStream +from anyio.streams.stapled import StapledObjectStream +from tornado.httputil import HTTPHeaders +from traitlets import Instance, Int +from traitlets.config import LoggingConfigurable +from traitlets.traitlets import MetaHasTraits + + +class LspStreamMeta(MetaHasTraits, ABCMeta): + pass + + +class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta): + """Non-blocking, queued base for communicating with Language Servers through anyio + streams + """ + + queue = Instance(StapledObjectStream, help="queue to get/put") + + def __repr__(self): # pragma: no cover + return "<{}(parent={})>".format(self.__class__.__name__, self.parent) + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.log.debug("%s initialized", self) + + @abstractmethod + async def close(self): + pass # pragma: no cover + + +class LspStreamReader(LspStreamBase): + """Language Server Reader""" + + receive_max_bytes = Int( + 65536, + help="the maximum size a header line send by the language server may have", + ).tag(config=True) + + stream = Instance( # type:ignore[assignment] + BufferedByteReceiveStream, help="the stream to read from" + ) # type: BufferedByteReceiveStream + + def __init__(self, stream: anyio.abc.ByteReceiveStream, **kwargs): + super().__init__(**kwargs) + self.stream = BufferedByteReceiveStream(stream) + + async def close(self): + await self.stream.aclose() + self.log.debug("%s closed", self) + + async def read(self) -> None: + """Read from a Language Server until it is closed""" + while True: + message = None + try: + message = await self.read_one() + await self.queue.send(message) + except anyio.ClosedResourceError: + # stream was closed -> terminate + self.log.debug("Stream closed while a read was still in progress") + break + except Exception as e: # pragma: no cover + self.log.exception( + "%s couldn't enqueue message: %s (%s)", self, message, e + ) + + async def _read_content(self, length: int) -> bytes: + """Read the full length of the message. + + Args: + - length: the content length + """ + try: + return await self.stream.receive_exactly(length) + except anyio.IncompleteRead: # pragma: no cover + # resource has been closed before the requested bytes could be retrieved + # -> signal recource closed + raise anyio.ClosedResourceError + + async def read_one(self) -> Text: + """Read a single message""" + message = "" + headers = HTTPHeaders() + + line = await self._readline() + + if line: + while line and line.strip(): + headers.parse_line(line) + line = await self._readline() + + content_length = int(headers.get("content-length", "0")) + + if content_length: + raw = await self._read_content(length=content_length) + message = raw.decode("utf-8").strip() + + return message + + async def _readline(self) -> Text: + """Read a line""" + try: + # use same max_bytes as is default for receive for now. It seems there is no + # way of getting the bytes read until max_bytes is reached, so we cannot + # iterate the receive_until call with smaller max_bytes values + line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes) + return line.decode("utf-8").strip() + except anyio.IncompleteRead: + # resource has been closed before the requested bytes could be retrieved + # -> signal recource closed + raise anyio.ClosedResourceError + except anyio.DelimiterNotFound: # pragma: no cover + self.log.error( + "Readline hit max_bytes before newline character was encountered" + ) + return "" + + +class LspStreamWriter(LspStreamBase): + """Language Server Writer""" + + stream = Instance( # type:ignore[assignment] + TextSendStream, help="the stream to write to" + ) # type: TextSendStream + + def __init__(self, stream: anyio.abc.ByteSendStream, **kwargs): + super().__init__(**kwargs) + self.stream = TextSendStream(stream, encoding="utf-8") + + async def close(self): + await self.stream.aclose() + self.log.debug("%s closed", self) + + async def write(self) -> None: + """Write to a Language Server until it closes""" + while True: + message = await self.queue.receive() + try: + n_bytes = len(message.encode("utf-8")) + response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message) + await self._write_one(response) + except ( + anyio.ClosedResourceError, + anyio.BrokenResourceError, + ): # pragma: no cover + # stream was closed -> terminate + self.log.debug("Stream closed while a write was still in progress") + break + except Exception: # pragma: no cover + self.log.exception("%s couldn't write message: %s", self, response) + + async def _write_one(self, message) -> None: + await self.stream.send(message) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/manager.py b/python_packages/jupyter_lsp/jupyter_lsp/manager.py index d6874e43c..10c8ee878 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/manager.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/manager.py @@ -1,4 +1,4 @@ -""" A configurable frontend for stdio-based Language Servers +""" A configurable frontend for stream-based Language Servers """ import asyncio import os @@ -35,7 +35,11 @@ EP_SPEC_V1, ) from .schema import LANGUAGE_SERVER_SPEC_MAP -from .session import LanguageServerSession +from .session import ( + LanguageServerSessionBase, + LanguageServerSessionStdio, + LanguageServerSessionTCP, +) from .trait_types import LoadableCallable, Schema from .types import ( KeyedLanguageServerSpecs, @@ -68,10 +72,10 @@ class LanguageServerManager(LanguageServerManagerAPI): ) # type: bool sessions = Dict_( # type:ignore[assignment] - trait=Instance(LanguageServerSession), + trait=Instance(LanguageServerSessionBase), default_value={}, help="sessions keyed by language server name", - ) # type: Dict[Tuple[Text], LanguageServerSession] + ) # type: Dict[Tuple[Text], LanguageServerSessionBase] virtual_documents_dir = Unicode( help="""Path to virtual documents relative to the content manager root @@ -160,9 +164,21 @@ def init_sessions(self): """create, but do not initialize all sessions""" sessions = {} for language_server, spec in self.language_servers.items(): - sessions[language_server] = LanguageServerSession( - language_server=language_server, spec=spec, parent=self - ) + mode = spec.get("mode", "stdio") + if mode == "stdio": + sessions[language_server] = LanguageServerSessionStdio( + language_server=language_server, spec=spec, parent=self + ) + elif mode == "tcp": + sessions[language_server] = LanguageServerSessionTCP( + language_server=language_server, spec=spec, parent=self + ) + else: # pragma: no cover + raise ValueError( + "Unknown session mode {} for language server '{}'".format( + mode, language_server + ) + ) self.sessions = sessions def init_listeners(self): diff --git a/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py b/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py deleted file mode 100644 index ef6f9de71..000000000 --- a/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Derived from - -> https://github.com/rudolfwalter/pygdbmi/blob/0.7.4.2/pygdbmi/gdbcontroller.py -> MIT License https://github.com/rudolfwalter/pygdbmi/blob/master/LICENSE -> Copyright (c) 2016 Chad Smith gmail.com> -""" -import os - -if os.name == "nt": # pragma: no cover - import msvcrt - from ctypes import POINTER, WinError, byref, windll, wintypes # type: ignore - from ctypes.wintypes import BOOL, DWORD, HANDLE # type: ignore -else: # pragma: no cover - import fcntl - - -def make_non_blocking(file_obj): # pragma: no cover - """ - make file object non-blocking - - Windows doesn't have the fcntl module, but someone on - stack overflow supplied this code as an answer, and it works - http://stackoverflow.com/a/34504971/2893090 - """ - - if os.name == "nt": - LPDWORD = POINTER(DWORD) - PIPE_NOWAIT = wintypes.DWORD(0x00000001) - - SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState - SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD] - SetNamedPipeHandleState.restype = BOOL - - h = msvcrt.get_osfhandle(file_obj.fileno()) - - res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None) - if res == 0: - raise ValueError(WinError()) - - else: - # Set the file status flag (F_SETFL) on the pipes to be non-blocking - # so we can attempt to read from a pipe with no new data without locking - # the program up - fcntl.fcntl(file_obj, fcntl.F_SETFL, os.O_NONBLOCK) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json index 61f31d1cb..7d3e1fa9d 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json +++ b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json @@ -139,6 +139,13 @@ "description": "list of MIME types supported by the language server", "title": "MIME Types" }, + "mode": { + "description": "connection mode used, e.g. stdio (default), tcp", + "title": "Mode", + "type": "string", + "enum": ["stdio", "tcp"], + "default": "stdio" + }, "troubleshoot": { "type": "string", "description": "information on troubleshooting the installation or auto-detection of the language server", diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 082142164..902ca558c 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -1,43 +1,76 @@ """ A session for managing a language server process """ -import asyncio import atexit +import math import os import string import subprocess +import sys +from abc import ABC, ABCMeta, abstractmethod from copy import copy from datetime import datetime, timezone +from threading import Event, Thread +from typing import List +import anyio +from anyio import CancelScope +from anyio.abc import Process, SocketStream +from anyio.streams.stapled import StapledObjectStream from tornado.ioloop import IOLoop -from tornado.queues import Queue from tornado.websocket import WebSocketHandler -from traitlets import Bunch, Instance, Set, Unicode, UseEnum, observe +from traitlets import Bunch, Float, Instance, Set, Unicode, UseEnum, observe from traitlets.config import LoggingConfigurable +from traitlets.traitlets import MetaHasTraits -from . import stdio +from .connection import LspStreamReader, LspStreamWriter from .schema import LANGUAGE_SERVER_SPEC from .specs.utils import censored_spec from .trait_types import Schema from .types import SessionStatus +from .utils import get_unused_port -class LanguageServerSession(LoggingConfigurable): +class LanguageServerSessionMeta(MetaHasTraits, ABCMeta): + pass + + +class LanguageServerSessionBase( + LoggingConfigurable, ABC, metaclass=LanguageServerSessionMeta +): """Manage a session for a connection to a language server""" language_server = Unicode(help="the language server implementation name") spec = Schema(LANGUAGE_SERVER_SPEC) # run-time specifics - process = Instance( - subprocess.Popen, help="the language server subprocess", allow_none=True + process = Instance(Process, help="the language server subprocess", allow_none=True) + cancelscope = Instance( + CancelScope, help="scope used for stopping the session", allow_none=True + ) + started = Instance( + Event, + args=(), + help="event signaling that the session has finished starting", + allow_none=False, + ) + thread = Instance( + Thread, help="worker thread for running an event loop", allow_none=True ) - writer = Instance(stdio.LspStdIoWriter, help="the JSON-RPC writer", allow_none=True) - reader = Instance(stdio.LspStdIoReader, help="the JSON-RPC reader", allow_none=True) + main_loop = Instance( + IOLoop, help="the event loop of the main thread", allow_none=True) + thread_loop = Instance( + IOLoop, help="the event loop of the worker thread", allow_none=True) + writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) + reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) from_lsp = Instance( - Queue, help="a queue for string messages from the server", allow_none=True + StapledObjectStream, + help="a queue for string messages from the server", + allow_none=True ) to_lsp = Instance( - Queue, help="a queue for string message to the server", allow_none=True + StapledObjectStream, + help="a queue for string messages to the server", + allow_none=True ) handlers = Set( trait=Instance(WebSocketHandler), @@ -48,7 +81,15 @@ class LanguageServerSession(LoggingConfigurable): last_handler_message_at = Instance(datetime, allow_none=True) last_server_message_at = Instance(datetime, allow_none=True) - _tasks = None + stop_timeout_s = Float( + 5, + help="timeout in seconds after which a process will be terminated forcefully", + ).tag(config=True) + queue_size = Float( + -1, + help="the maximum number of messages that can be buffered in the queue or -1 " + "for an unbounded queue" + ).tag(config=True) _skip_serialize = ["argv", "debug_argv"] @@ -75,40 +116,76 @@ def to_json(self): spec=censored_spec(self.spec), ) - def initialize(self): - """(re)initialize a language server session""" - self.stop() + def start(self): + """run a language server session asynchronously inside a worker thread + + will return as soon as the session is ready for communication + """ + self.main_loop = IOLoop.current() + self.started.clear() + self.thread = Thread(target=anyio.run, kwargs={"func": self.run}) + self.thread.start() + self.started.wait() + + def stop(self): + """shut down the session""" + if self.cancelscope is not None: + self.thread_loop.add_callback(self.cancelscope.cancel) + + # wait for the session to get cleaned up + if self.thread and self.thread.is_alive(): + self.thread.join() + self.main_loop = None + + async def run(self): + """run this session in a task group and clean everything up on cancellation""" + self.thread_loop = IOLoop.current() + + try: + async with anyio.create_task_group() as tg: + self.cancelscope = tg.cancel_scope + await self.initialize() + self.started.set() + tg.start_soon(self._read_lsp) + tg.start_soon(self._write_lsp) + tg.start_soon(self._broadcast_from_lsp) + except Exception as e: # pragma: no cover + self.log.exception("Execption while listening {}", e) + finally: + await self.cleanup() + self.cancelscope = None + self.thread_loop = None + + async def initialize(self): + """initialize a language server session""" self.status = SessionStatus.STARTING + self.init_queues() - self.init_process() + await self.init_process() self.init_writer() self.init_reader() - loop = asyncio.get_event_loop() - self._tasks = [ - loop.create_task(coro()) - for coro in [self._read_lsp, self._write_lsp, self._broadcast_from_lsp] - ] - self.status = SessionStatus.STARTED - def stop(self): + async def cleanup(self): """clean up all of the state of the session""" - self.status = SessionStatus.STOPPING - if self.process: - self.process.terminate() - self.process = None - if self.reader: - self.reader.close() + if self.reader is not None: + await self.reader.close() self.reader = None - if self.writer: - self.writer.close() + if self.writer is not None: + await self.writer.close() self.writer = None - - if self._tasks: - [task.cancel() for task in self._tasks] + if self.process is not None: + await self.stop_process(self.stop_timeout_s) + self.process = None + if self.from_lsp is not None: + await self.from_lsp.aclose() + self.from_lsp = None + if self.to_lsp is not None: + await self.to_lsp.aclose() + self.to_lsp = None self.status = SessionStatus.STOPPED @@ -116,44 +193,76 @@ def stop(self): def _on_handlers(self, change: Bunch): """re-initialize if someone starts listening, or stop if nobody is""" if change["new"] and not self.process: - self.initialize() + self.start() elif not change["new"] and self.process: self.stop() def write(self, message): """wrapper around the write queue to keep it mostly internal""" self.last_handler_message_at = self.now() - IOLoop.current().add_callback(self.to_lsp.put_nowait, message) + self.thread_loop.add_callback(self.to_lsp.send, message) def now(self): return datetime.now(timezone.utc) - def init_process(self): - """start the language server subprocess""" - self.process = subprocess.Popen( - self.spec["argv"], + async def start_process(self, argv: List[str]): + """start the language server subprocess given in argv""" + self.process = await anyio.open_process( + argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=self.substitute_env(self.spec.get("env", {}), os.environ), - bufsize=0, ) - def init_queues(self): - """create the queues""" - self.from_lsp = Queue() - self.to_lsp = Queue() + async def stop_process(self, timeout: int = 5): + """stop the language server subprocess - def init_reader(self): - """create the stdout reader (from the language server)""" - self.reader = stdio.LspStdIoReader( - stream=self.process.stdout, queue=self.from_lsp, parent=self - ) + If the process does not terminate within timeout seconds it will be killed + forcefully. + """ + if self.process is None: # pragma: no cover + return + + if timeout < 0.0: # pragma: no cover + raise ValueError("Timeout must not be negative!") + + # try to stop the process gracefully + self.process.terminate() + with anyio.move_on_after(timeout + 0.1): # avoid timeout of 0s + self.log.debug("Waiting for process to terminate") + await self.process.wait() + return + + if sys.platform.startswith("win32"): # pragma: no cover + # On Windows Process.kill() is an alias to Process.terminate() so we cannot + # force the process to stop. if you know of a better way to handle this on + # Windows please consider contributing + self.log.warning( + ( + "The language server process (PID: {}) did not terminate within {} " + "seconds. Beware, it might continue running as a zombie process." + ).format(self.process.pid, timeout) + ) + else: # pragma: no cover + self.log.debug( + ( + "Process did not terminate within {} seconds. " + "Bringing it down the hard way!" + ).format(timeout) + ) + try: # pragma: no cover + self.process.kill() + except ProcessLookupError: + # process terminated on its own in the meantime + pass - def init_writer(self): - """create the stdin writer (to the language server)""" - self.writer = stdio.LspStdIoWriter( - stream=self.process.stdin, queue=self.to_lsp, parent=self - ) + def init_queues(self): + """create the queues""" + queue_size = math.inf if self.queue_size < 0 else self.queue_size + self.from_lsp = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=queue_size)) + self.to_lsp = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=queue_size)) def substitute_env(self, env, base): final_env = copy(os.environ) @@ -163,6 +272,25 @@ def substitute_env(self, env, base): return final_env + @abstractmethod + async def init_process(self): + """start the language server subprocess and store it in self.process""" + pass # pragma: no cover + + @abstractmethod + def init_reader(self): + """create the stream reader (from the language server) and store it in + self.reader + """ + pass # pragma: no cover + + @abstractmethod + def init_writer(self): + """create the stream writer (to the language server) and store it in + self.writer + """ + pass # pragma: no cover + async def _read_lsp(self): await self.reader.read() @@ -175,5 +303,85 @@ async def _broadcast_from_lsp(self): """ async for message in self.from_lsp: self.last_server_message_at = self.now() - await self.parent.on_server_message(message, self) - self.from_lsp.task_done() + # handle message in the main thread's event loop + self.main_loop.add_callback(self.parent.on_server_message, message, self) + + +class LanguageServerSessionStdio(LanguageServerSessionBase): + async def init_process(self): + await self.start_process(self.spec["argv"]) + + def init_reader(self): + self.reader = LspStreamReader( + stream=self.process.stdout, queue=self.from_lsp, parent=self + ) + + def init_writer(self): + self.writer = LspStreamWriter( + stream=self.process.stdin, queue=self.to_lsp, parent=self + ) + + +class LanguageServerSessionTCP(LanguageServerSessionBase): + + tcp_con = Instance(SocketStream, help="the tcp connection", allow_none=True) + + async def init_process(self): + """start the language server subprocess""" + argv = self.spec["argv"] + + host = "127.0.0.1" + port = get_unused_port() + + # substitute arguments for host and port into the environment + argv = [arg.format(host=host, port=port) for arg in argv] + + # start the process + await self.start_process(argv) + + # finally open the tcp connection to the now running process + self.tcp_con = await self.init_tcp_connection(host, port) + + async def stop_process(self, timeout: int = 5): + await self.tcp_con.aclose() + self.tcp_con = None + + await super().stop_process(timeout) + + async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): + server = "{}:{}".format(host, port) + + tries = 0 + while tries < retries: + tries = tries + 1 + try: + return await anyio.connect_tcp(host, port) + except OSError: # pragma: no cover + if tries < retries: + self.log.warning( + ( + "Connection to server {} refused! " + "Attempt {}/{}. " + "Retrying in {}s" + ).format(server, tries, retries, sleep) + ) + await anyio.sleep(sleep) + else: + self.log.warning( + "Connection to server {} refused! Attempt {}/{}.".format( + server, tries, retries + ) + ) + raise OSError( + "Unable to connect to server {}".format(server) + ) # pragma: no cover + + def init_reader(self): + self.reader = LspStreamReader( + stream=self.tcp_con, queue=self.from_lsp, parent=self + ) + + def init_writer(self): + self.writer = LspStreamWriter( + stream=self.tcp_con, queue=self.to_lsp, parent=self + ) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py b/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py index edd15f613..c96164579 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py @@ -9,7 +9,7 @@ from .julia_language_server import JuliaLanguageServer from .pyls import PalantirPythonLanguageServer from .pyright import PyrightLanguageServer -from .python_lsp_server import PythonLSPServer +from .python_lsp_server import PythonLSPServer, PythonLSPServerTCP from .r_languageserver import RLanguageServer from .sql_language_server import SQLLanguageServer from .texlab import Texlab @@ -30,6 +30,7 @@ md = UnifiedLanguageServer() py_palantir = PalantirPythonLanguageServer() py_lsp_server = PythonLSPServer() +py_lsp_server_tcp = PythonLSPServerTCP() pyright = PyrightLanguageServer() r = RLanguageServer() tex = Texlab() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py b/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py index 799222b2d..6dc266f67 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py @@ -47,3 +47,11 @@ class PythonLSPServer(PythonModuleSpec): config_schema=load_config_schema(key), env=dict(PYTHONUNBUFFERED="1"), ) + + +class PythonLSPServerTCP(PythonLSPServer): + key = "pylsp-tcp" + args = ["--tcp", "--port", "{port}"] + spec = PythonLSPServer.spec.copy() + spec["display_name"] = "python-lsp-server (pylsp) over tcp" + spec["mode"] = "tcp" diff --git a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/stdio.py deleted file mode 100644 index e9af4d5ae..000000000 --- a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py +++ /dev/null @@ -1,202 +0,0 @@ -""" Language Server stdio-mode readers - -Parts of this code are derived from: - -> https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/pyls_jsonrpc/streams.py#L83 # noqa -> https://github.com/palantir/python-jsonrpc-server/blob/45ed1931e4b2e5100cc61b3992c16d6f68af2e80/pyls_jsonrpc/streams.py # noqa -> > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE -> > Copyright 2018 Palantir Technologies, Inc. -""" -# pylint: disable=broad-except -import asyncio -import io -import os -from concurrent.futures import ThreadPoolExecutor -from typing import List, Optional, Text - -from tornado.concurrent import run_on_executor -from tornado.gen import convert_yielded -from tornado.httputil import HTTPHeaders -from tornado.ioloop import IOLoop -from tornado.queues import Queue -from traitlets import Float, Instance, default -from traitlets.config import LoggingConfigurable - -from .non_blocking import make_non_blocking - - -class LspStdIoBase(LoggingConfigurable): - """Non-blocking, queued base for communicating with stdio Language Servers""" - - executor = None - - stream = Instance( # type:ignore[assignment] - io.RawIOBase, help="the stream to read/write" - ) # type: io.RawIOBase - queue = Instance(Queue, help="queue to get/put") - - def __repr__(self): # pragma: no cover - return "<{}(parent={})>".format(self.__class__.__name__, self.parent) - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.log.debug("%s initialized", self) - self.executor = ThreadPoolExecutor(max_workers=1) - - def close(self): - self.stream.close() - self.log.debug("%s closed", self) - - -class LspStdIoReader(LspStdIoBase): - """Language Server stdio Reader - - Because non-blocking (but still synchronous) IO is used, rudimentary - exponential backoff is used. - """ - - max_wait = Float(help="maximum time to wait on idle stream").tag(config=True) - min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True) - next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True) - - @default("max_wait") - def _default_max_wait(self): - return 0.1 if os.name == "nt" else self.min_wait * 2 - - async def sleep(self): - """Simple exponential backoff for sleeping""" - if self.stream.closed: # pragma: no cover - return - self.next_wait = min(self.next_wait * 2, self.max_wait) - try: - await asyncio.sleep(self.next_wait) - except Exception: # pragma: no cover - pass - - def wake(self): - """Reset the wait time""" - self.wait = self.min_wait - - async def read(self) -> None: - """Read from a Language Server until it is closed""" - make_non_blocking(self.stream) - - while not self.stream.closed: - message = None - try: - message = await self.read_one() - - if not message: - await self.sleep() - continue - else: - self.wake() - - IOLoop.current().add_callback(self.queue.put_nowait, message) - except Exception as e: # pragma: no cover - self.log.exception( - "%s couldn't enqueue message: %s (%s)", self, message, e - ) - await self.sleep() - - async def _read_content( - self, length: int, max_parts=1000, max_empties=200 - ) -> Optional[bytes]: - """Read the full length of the message unless exceeding max_parts or - max_empties empty reads occur. - - See https://github.com/jupyter-lsp/jupyterlab-lsp/issues/450 - - Crucial docs or read(): - "If the argument is positive, and the underlying raw - stream is not interactive, multiple raw reads may be issued - to satisfy the byte count (unless EOF is reached first)" - - Args: - - length: the content length - - max_parts: prevent absurdly long messages (1000 parts is several MBs): - 1 part is usually sufficient but not enough for some long - messages 2 or 3 parts are often needed. - """ - raw = None - raw_parts: List[bytes] = [] - received_size = 0 - while received_size < length and len(raw_parts) < max_parts and max_empties > 0: - part = None - try: - part = self.stream.read(length - received_size) - except OSError: # pragma: no cover - pass - if part is None: - max_empties -= 1 - await self.sleep() - continue - received_size += len(part) - raw_parts.append(part) - - if raw_parts: - raw = b"".join(raw_parts) - if len(raw) != length: # pragma: no cover - self.log.warning( - f"Readout and content-length mismatch: {len(raw)} vs {length};" - f"remaining empties: {max_empties}; remaining parts: {max_parts}" - ) - - return raw - - async def read_one(self) -> Text: - """Read a single message""" - message = "" - headers = HTTPHeaders() - - line = await convert_yielded(self._readline()) - - if line: - while line and line.strip(): - headers.parse_line(line) - line = await convert_yielded(self._readline()) - - content_length = int(headers.get("content-length", "0")) - - if content_length: - raw = await self._read_content(length=content_length) - if raw is not None: - message = raw.decode("utf-8").strip() - else: # pragma: no cover - self.log.warning( - "%s failed to read message of length %s", - self, - content_length, - ) - - return message - - @run_on_executor - def _readline(self) -> Text: - """Read a line (or immediately return None)""" - try: - return self.stream.readline().decode("utf-8").strip() - except OSError: # pragma: no cover - return "" - - -class LspStdIoWriter(LspStdIoBase): - """Language Server stdio Writer""" - - async def write(self) -> None: - """Write to a Language Server until it closes""" - while not self.stream.closed: - message = await self.queue.get() - try: - body = message.encode("utf-8") - response = "Content-Length: {}\r\n\r\n{}".format(len(body), message) - await convert_yielded(self._write_one(response.encode("utf-8"))) - except Exception: # pragma: no cover - self.log.exception("%s couldn't write message: %s", self, response) - finally: - self.queue.task_done() - - @run_on_executor - def _write_one(self, message) -> None: - self.stream.write(message) - self.stream.flush() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py index fd8e0b93a..4ece104fe 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py @@ -20,6 +20,7 @@ "dockerfile-language-server-nodejs", "typescript-language-server", "pylsp", + "pylsp-tcp", "unified-language-server", "sql-language-server", "vscode-css-languageserver-bin", @@ -112,6 +113,12 @@ def jsonrpc_init_msg(): ) +# only run tests with asyncio +@fixture +def anyio_backend(): + return "asyncio" + + @fixture def app(): return MockServerApp() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py index 528be4034..09ab2c319 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py @@ -1,7 +1,8 @@ import pytest import traitlets -from jupyter_lsp.session import LanguageServerSession +from jupyter_lsp.schema import SPEC_VERSION +from jupyter_lsp.session import LanguageServerSessionStdio @pytest.mark.parametrize( @@ -12,8 +13,14 @@ {"languages": None}, {"languages": 1}, {"languages": [1, "two"]}, + { + "argv": ["command"], + "languages": ["some language"], + "version": SPEC_VERSION, + "mode": "unknown", + }, ], ) def test_bad_spec(spec): with pytest.raises(traitlets.TraitError): - LanguageServerSession(spec=spec) + LanguageServerSessionStdio(spec=spec) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py new file mode 100644 index 000000000..89c8d4b48 --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py @@ -0,0 +1,160 @@ +import subprocess + +import anyio +from anyio.streams.stapled import StapledObjectStream +import math +import pytest + +from jupyter_lsp.connection import LspStreamReader +from jupyter_lsp.utils import get_unused_port + +WRITER_TEMPLATE = """ +from time import sleep + +# the LSP states that each header field must be terminated by \\r\\n +print('Content-Length: {length}', end='\\r\\n') +# and the header must be terminated again by \\r\\n +print(end='\\r\\n') + +for repeat in range({repeats}): + sleep({interval}) + print('{message}', end='') + +if {add_excess}: + print("extra", end='') + +print() +""" + +TCP_WRITER_TEMPLATE = """ +from anyio import create_tcp_listener, create_task_group, run, sleep, Event + +async def serve_once(listener): + async def handle(client): + async with client: + # the LSP states that each header field must be terminated by \\r\\n + await client.send(b'Content-Length: {length}\\r\\n') + # and the header must be terminated again by \\r\\n + await client.send(b'\\r\\n') + + for repeat in range({repeats}): + await sleep({interval}) + await client.send(b'{message}') + + if {add_excess}: + await client.send(b"extra") + + await client.send(b'\\n') + + stop.set() + + async def cancel_on_event(scope): + await stop.wait() + scope.cancel() + + stop = Event() + async with create_task_group() as tg: + tg.start_soon(listener.serve, handle) + tg.start_soon(cancel_on_event, tg.cancel_scope) + +async def main(): + listener = await create_tcp_listener(local_port={port}) + await serve_once(listener) + +run(main) +""" + + +class CommunicatorSpawner: + def __init__(self, tmp_path): + self.tmp_path = tmp_path + + async def spawn_writer( + self, message: str, repeats: int = 1, interval=None, add_excess=False, port=None + ): + template = WRITER_TEMPLATE if port is None else TCP_WRITER_TEMPLATE + length = len(message) * repeats + commands_file = self.tmp_path / "writer.py" + commands_file.write_text( + template.format( + length=length, + repeats=repeats, + interval=interval or 0, + message=message, + add_excess=add_excess, + port=port, + ) + ) + return await anyio.open_process( + ["python", "-u", str(commands_file)], stdout=subprocess.PIPE + ) + + +@pytest.fixture +def communicator_spawner(tmp_path): + return CommunicatorSpawner(tmp_path) + + +async def join_process(process: anyio.abc.Process, headstart=1, timeout=1): + await anyio.sleep(headstart) + # wait for timeout second for the process to terminate before raising a TimeoutError + with anyio.fail_after(timeout): + result = await process.wait() + # close any streams attached to stdout + if process.stdout: + await process.stdout.aclose() + return result + + +@pytest.mark.parametrize( + "message,repeats,interval,add_excess", + [ + ["short", 1, None, False], + ["ab" * 100_000, 1, None, False], + ["ab", 2, 0.01, False], + ["ab", 45, 0.01, False], + ["message", 2, 0.01, True], + ], + ids=[ + "short", + "long", + "intermittent", + "intensive-intermittent", + "with-excess", + ], +) +@pytest.mark.parametrize("mode", ["stdio", "tcp"], ids=["stdio", "tcp"]) +@pytest.mark.anyio +async def test_reader( + message, repeats, interval, add_excess, mode, communicator_spawner +): + queue = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=math.inf)) + + port = get_unused_port() if mode == "tcp" else None + process = await communicator_spawner.spawn_writer( + message=message, + repeats=repeats, + interval=interval, + add_excess=add_excess, + port=port, + ) + stream = None + if port is None: + stream = process.stdout + else: + # give the server some time to start + await anyio.sleep(2) + stream = await anyio.connect_tcp("127.0.0.1", port) + + reader = LspStreamReader(stream=stream, queue=queue) + + async with anyio.create_task_group() as tg: + tg.start_soon(join_process, process, 3, 1) + tg.start_soon(reader.read) + + if port is not None: + await stream.aclose() + + result = await queue.receive() + assert result == message * repeats diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index 5e704550b..c73bcdea4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -1,4 +1,6 @@ import asyncio +import os +from sys import platform import pytest @@ -100,3 +102,46 @@ async def test_ping(handlers): assert ws_handler._ping_sent is True ws_handler.on_close() + + +def exists_process_with_pid(pid): + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "timeout", [0, 5], ids=["terminate immediately", "after 5 seconds"] +) +async def test_stop(handlers, timeout): + """Test whether process will stop gracefully or forcefully""" + a_server = "pylsp" + + handler, ws_handler = handlers + manager = handler.manager + + manager.initialize() + + await ws_handler.open(a_server) + + session = manager.sessions[ws_handler.language_server] + session.stop_timeout = timeout + + process_pid = session.process.pid + assert exists_process_with_pid(process_pid) is True + + ws_handler.on_close() + + if platform.startswith("win32"): # pragma: no cover + # currently we cannot forcefully terminate the process on windows, so we just + # give it a little more extra time to finish on its own + await asyncio.sleep(timeout + 10) + else: # pragma: no cover + # linux and darwin + await asyncio.sleep(timeout + 2) + + assert exists_process_with_pid(process_pid) is False diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py deleted file mode 100644 index 8df7e2c57..000000000 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py +++ /dev/null @@ -1,85 +0,0 @@ -import asyncio -import subprocess - -import pytest -from tornado.queues import Queue - -from jupyter_lsp.stdio import LspStdIoReader - -WRITER_TEMPLATE = """ -from time import sleep - -print('Content-Length: {length}') -print() - -for repeat in range({repeats}): - sleep({interval}) - print('{message}', end='') - -if {add_excess}: - print("extra", end='') - -print() -""" - - -class CommunicatorSpawner: - def __init__(self, tmp_path): - self.tmp_path = tmp_path - - def spawn_writer( - self, message: str, repeats: int = 1, interval=None, add_excess=False - ): - length = len(message) * repeats - commands_file = self.tmp_path / "writer.py" - commands_file.write_text( - WRITER_TEMPLATE.format( - length=length, - repeats=repeats, - interval=interval or 0, - message=message, - add_excess=add_excess, - ) - ) - return subprocess.Popen( - ["python", "-u", str(commands_file)], stdout=subprocess.PIPE, bufsize=0 - ) - - -@pytest.fixture -def communicator_spawner(tmp_path): - return CommunicatorSpawner(tmp_path) - - -async def join_process(process: subprocess.Popen, headstart=1, timeout=1): - await asyncio.sleep(headstart) - result = process.wait(timeout=timeout) - if process.stdout: - process.stdout.close() - return result - - -@pytest.mark.parametrize( - "message,repeats,interval,add_excess", - [ - ["short", 1, None, False], - ["ab" * 10_0000, 1, None, False], - ["ab", 2, 0.01, False], - ["ab", 45, 0.01, False], - ["message", 2, 0.01, True], - ], - ids=["short", "long", "intermittent", "intensive-intermittent", "with-excess"], -) -@pytest.mark.asyncio -async def test_reader(message, repeats, interval, add_excess, communicator_spawner): - queue = Queue() - - process = communicator_spawner.spawn_writer( - message=message, repeats=repeats, interval=interval, add_excess=add_excess - ) - reader = LspStdIoReader(stream=process.stdout, queue=queue) - - await asyncio.gather(join_process(process, headstart=3, timeout=1), reader.read()) - - result = queue.get_nowait() - assert result == message * repeats diff --git a/python_packages/jupyter_lsp/jupyter_lsp/utils.py b/python_packages/jupyter_lsp/jupyter_lsp/utils.py new file mode 100644 index 000000000..dac9307ed --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/utils.py @@ -0,0 +1,16 @@ +""" get a random port +""" +import socket + + +def get_unused_port(): + """Get an unused port by trying to listen to any random port. + + Probably could introduce race conditions if inside a tight loop. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(("127.0.0.1", 0)) + sock.listen(1) + port = sock.getsockname()[1] + sock.close() + return port diff --git a/python_packages/jupyter_lsp/setup.cfg b/python_packages/jupyter_lsp/setup.cfg index f52cd4212..0b0dad51a 100644 --- a/python_packages/jupyter_lsp/setup.cfg +++ b/python_packages/jupyter_lsp/setup.cfg @@ -41,6 +41,7 @@ jupyter_lsp_spec_v1 = julia-language-server = jupyter_lsp.specs:julia python-language-server = jupyter_lsp.specs:py_palantir python-lsp-server = jupyter_lsp.specs:py_lsp_server + python-lsp-server-tcp = jupyter_lsp.specs:py_lsp_server_tcp pyright = jupyter_lsp.specs:pyright r-languageserver = jupyter_lsp.specs:r texlab = jupyter_lsp.specs:tex