From 09c8c396d2552416727e91d9331f453fe41a6951 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 21 Feb 2021 18:14:36 +0100 Subject: [PATCH 01/51] Move sessions to anyio --- .../jupyter_lsp/jupyter_lsp/session.py | 62 ++++++++++------- .../jupyter_lsp/jupyter_lsp/stdio.py | 69 ++++++++++++------- 2 files changed, 81 insertions(+), 50 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 4b9a6aea7..386574bac 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -1,6 +1,6 @@ """ A session for managing a language server process """ -import asyncio +import anyio import atexit import os import string @@ -8,6 +8,8 @@ from copy import copy from datetime import datetime, timezone +from concurrent.futures import ThreadPoolExecutor +from tornado.concurrent import run_on_executor from tornado.ioloop import IOLoop from tornado.queues import Queue from tornado.websocket import WebSocketHandler @@ -31,8 +33,11 @@ class LanguageServerSession(LoggingConfigurable): # run-time specifics process = Instance( - subprocess.Popen, help="the language server subprocess", allow_none=True + anyio.abc.Process, help="the language server subprocess", allow_none=True ) + executor = None + portal = None + cancelscope = None writer = Instance(stdio.LspStdIoWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(stdio.LspStdIoReader, help="the JSON-RPC reader", allow_none=True) from_lsp = Instance( @@ -50,14 +55,15 @@ class LanguageServerSession(LoggingConfigurable): last_handler_message_at = Instance(datetime, allow_none=True) last_server_message_at = Instance(datetime, allow_none=True) - _tasks = None - _skip_serialize = ["argv", "debug_argv"] def __init__(self, *args, **kwargs): """set up the required traitlets and exit behavior for a session""" super().__init__(*args, **kwargs) atexit.register(self.stop) + self.executor = ThreadPoolExecutor(max_workers=1) + self.portal = anyio.start_blocking_portal() + def __repr__(self): # pragma: no cover return ( @@ -82,15 +88,12 @@ def initialize(self): self.stop() self.status = SessionStatus.STARTING self.init_queues() - self.init_process() + self.portal.call(self.init_process) self.init_writer() self.init_reader() - loop = asyncio.get_event_loop() - self._tasks = [ - loop.create_task(coro()) - for coro in [self._read_lsp, self._write_lsp, self._broadcast_from_lsp] - ] + # start listening on the executor in a different event loop + self.listen() self.status = SessionStatus.STARTED @@ -103,14 +106,15 @@ def stop(self): self.process.terminate() self.process = None if self.reader: - self.reader.close() + self.portal.call(self.reader.close) self.reader = None if self.writer: - self.writer.close() + self.portal.call(self.writer.close) self.writer = None - if self._tasks: - [task.cancel() for task in self._tasks] + if self.cancelscope: + self.portal.call(self.cancelscope.cancel) + self.cancelscope = None self.status = SessionStatus.STOPPED @@ -130,14 +134,13 @@ def write(self, message): def now(self): return datetime.now(timezone.utc) - def init_process(self): + async def init_process(self): """start the language server subprocess""" - self.process = subprocess.Popen( + self.substitute_env(self.spec.get("env", {}), os.environ) + self.process = await anyio.open_process( self.spec["argv"], stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - env=self.substitute_env(self.spec.get("env", {}), os.environ), - bufsize=0, + stdout=subprocess.PIPE ) def init_queues(self): @@ -158,12 +161,23 @@ def init_writer(self): ) def substitute_env(self, env, base): - final_env = copy(os.environ) - for key, value in env.items(): - final_env.update({key: string.Template(value).safe_substitute(base)}) - - return final_env + os.environ.update({key: string.Template(value).safe_substitute(base)}) + + @run_on_executor + def listen(self): + self.portal.call(self._listen) + + async def _listen(self): + try: + async with anyio.create_task_group() as tg: + async with anyio.open_cancel_scope() as scope: + self.cancelscope = scope + await tg.spawn(self._read_lsp) + await tg.spawn(self._write_lsp) + await tg.spawn(self._broadcast_from_lsp) + except Exception as e: + self.log.exception("Execption while listening {}", e) async def _read_lsp(self): await self.reader.read() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/stdio.py index d9984f31a..567ec259f 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/stdio.py @@ -8,7 +8,9 @@ > > Copyright 2018 Palantir Technologies, Inc. """ # pylint: disable=broad-except -import asyncio +import anyio +from anyio.streams.buffered import BufferedByteReceiveStream +from anyio.streams.text import TextSendStream import io import os from concurrent.futures import ThreadPoolExecutor @@ -24,15 +26,14 @@ from .non_blocking import make_non_blocking - class LspStdIoBase(LoggingConfigurable): """Non-blocking, queued base for communicating with stdio Language Servers""" executor = None stream = Instance( - io.RawIOBase, help="the stream to read/write" - ) # type: io.RawIOBase + anyio.abc.AsyncResource, help="the stream to read/write" + ) # type: anyio.abc.AsyncResource queue = Instance(Queue, help="queue to get/put") def __repr__(self): # pragma: no cover @@ -43,8 +44,8 @@ def __init__(self, **kwargs): self.log.debug("%s initialized", self) self.executor = ThreadPoolExecutor(max_workers=1) - def close(self): - self.stream.close() + async def close(self): + await self.stream.aclose() self.log.debug("%s closed", self) @@ -59,13 +60,17 @@ class LspStdIoReader(LspStdIoBase): min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True) next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True) + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.stream = BufferedByteReceiveStream(self.stream) + @default("max_wait") def _default_max_wait(self): return 0.1 if os.name == "nt" else self.min_wait * 2 async def sleep(self): """Simple exponential backoff for sleeping""" - if self.stream.closed: # pragma: no cover + if self.stream._closed: # pragma: no cover return self.next_wait = min(self.next_wait * 2, self.max_wait) try: @@ -79,9 +84,7 @@ def wake(self): async def read(self) -> None: """Read from a Language Server until it is closed""" - make_non_blocking(self.stream) - - while not self.stream.closed: + while True: message = None try: message = await self.read_one() @@ -93,6 +96,9 @@ async def read(self) -> None: self.wake() IOLoop.current().add_callback(self.queue.put_nowait, message) + except (anyio.ClosedResourceError, anyio.EndOfStream): + # stream was closed -> terminate + break except Exception as e: # pragma: no cover self.log.exception( "%s couldn't enqueue message: %s (%s)", self, message, e @@ -124,8 +130,8 @@ async def _read_content( while received_size < length and len(raw_parts) < max_parts and max_empties > 0: part = None try: - part = self.stream.read(length - received_size) - except OSError: # pragma: no cover + part = await self.stream.receive_exactly(length - received_size) + except anyio.IncompleteRead: # pragma: no cover pass if part is None: max_empties -= 1 @@ -171,32 +177,43 @@ async def read_one(self) -> Text: return message - @run_on_executor - def _readline(self) -> Text: + async def _readline(self) -> Text: """Read a line (or immediately return None)""" try: - return self.stream.readline().decode("utf-8").strip() - except OSError: # pragma: no cover + # use same max_bytes as is default for receive for now. It seems there is no way of getting + # the bytes read until max_bytes is reached, so we cannot iterate the receive_until call + # with smaller max_bytes values + async with anyio.move_on_after(0.2) as moa: + line = await self.stream.receive_until(b'\r\n', 65536) + return line.decode("utf-8").strip() + except anyio.IncompleteRead: + return "" + except anyio.DelimiterNotFound: + self.log.error("Readline hit max_bytes before newline character was encountered") return "" - class LspStdIoWriter(LspStdIoBase): """Language Server stdio Writer""" + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.stream = TextSendStream(self.stream, encoding='utf-8') + async def write(self) -> None: """Write to a Language Server until it closes""" - while not self.stream.closed: + while True: message = await self.queue.get() try: - body = message.encode("utf-8") - response = "Content-Length: {}\r\n\r\n{}".format(len(body), message) - await convert_yielded(self._write_one(response.encode("utf-8"))) - except Exception: # pragma: no cover + nBytes = len(message.encode("utf-8")) + response = "Content-Length: {}\r\n\r\n{}".format(nBytes, message) + await convert_yielded(self._write_one(response)) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover + # stream was closed -> terminate + break + except Exception: self.log.exception("%s couldn't write message: %s", self, response) finally: self.queue.task_done() - @run_on_executor - def _write_one(self, message) -> None: - self.stream.write(message) - self.stream.flush() + async def _write_one(self, message) -> None: + await self.stream.send(message) From 4b3ab1b3c0a13cd3a7c5d4b748d3d664b65ed2c2 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Wed, 17 Mar 2021 17:30:49 +0100 Subject: [PATCH 02/51] Add support for connection to lsp providers through tcp --- .../jupyter_lsp/{stdio.py => connection.py} | 14 ++-- .../jupyter_lsp/jupyter_lsp/manager.py | 2 +- .../jupyter_lsp/schema/schema.json | 30 +++++++ .../jupyter_lsp/jupyter_lsp/session.py | 80 +++++++++++++++++-- .../jupyter_lsp/tests/test_stdio.py | 2 +- .../jupyter_lsp/jupyter_lsp/utils.py | 16 ++++ 6 files changed, 127 insertions(+), 17 deletions(-) rename python_packages/jupyter_lsp/jupyter_lsp/{stdio.py => connection.py} (96%) create mode 100644 python_packages/jupyter_lsp/jupyter_lsp/utils.py diff --git a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py similarity index 96% rename from python_packages/jupyter_lsp/jupyter_lsp/stdio.py rename to python_packages/jupyter_lsp/jupyter_lsp/connection.py index 567ec259f..c7ce543a1 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -1,4 +1,4 @@ -""" Language Server stdio-mode readers +""" Language Server readers and writers Parts of this code are derived from: @@ -26,8 +26,8 @@ from .non_blocking import make_non_blocking -class LspStdIoBase(LoggingConfigurable): - """Non-blocking, queued base for communicating with stdio Language Servers""" +class LspStreamBase(LoggingConfigurable): + """Non-blocking, queued base for communicating with Language Servers through anyio streams""" executor = None @@ -49,8 +49,8 @@ async def close(self): self.log.debug("%s closed", self) -class LspStdIoReader(LspStdIoBase): - """Language Server stdio Reader +class LspStreamReader(LspStreamBase): + """Language Server Reader Because non-blocking (but still synchronous) IO is used, rudimentary exponential backoff is used. @@ -192,8 +192,8 @@ async def _readline(self) -> Text: self.log.error("Readline hit max_bytes before newline character was encountered") return "" -class LspStdIoWriter(LspStdIoBase): - """Language Server stdio Writer""" +class LspStreamWriter(LspStreamBase): + """Language Server Writer""" def __init__(self, **kwargs): super().__init__(**kwargs) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/manager.py b/python_packages/jupyter_lsp/jupyter_lsp/manager.py index 68b8a4807..0ea315480 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/manager.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/manager.py @@ -1,4 +1,4 @@ -""" A configurable frontend for stdio-based Language Servers +""" A configurable frontend for stream-based Language Servers """ import os import traceback diff --git a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json index 67fd35b75..78fe525e4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json +++ b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json @@ -139,6 +139,36 @@ "description": "list of MIME types supported by the language server", "title": "MIME Types" }, + "mode": { + "description": "connection mode used, e.g. stdio (default), tcp", + "title": "Mode", + "type": "string", + "enum": ["stdio", "tcp"] + }, + "port": { + "description": "the port for tcp mode connections. a null value will select a random, unused port", + "title": "Port", + "oneOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ] + }, + "host": { + "description": "the host for tcp mode connections. a null value will assume '127.0.0.1'", + "title": "Host", + "oneOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ] + }, "urls": { "additionalProperties": { "format": "uri", diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 386574bac..f7a387f50 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -16,10 +16,11 @@ from traitlets import Bunch, Instance, Set, Unicode, UseEnum, observe from traitlets.config import LoggingConfigurable -from . import stdio +from .connection import LspStreamWriter, LspStreamReader from .schema import LANGUAGE_SERVER_SPEC from .trait_types import Schema from .types import SessionStatus +from .utils import get_unused_port # these are not desirable to publish to the frontend SKIP_JSON_SPEC = ["argv", "debug_argv", "env"] @@ -38,8 +39,9 @@ class LanguageServerSession(LoggingConfigurable): executor = None portal = None cancelscope = None - writer = Instance(stdio.LspStdIoWriter, help="the JSON-RPC writer", allow_none=True) - reader = Instance(stdio.LspStdIoReader, help="the JSON-RPC reader", allow_none=True) + writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) + reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) + tcp_con = Instance(anyio.abc.SocketStream, help="the tcp connection", allow_none=True) from_lsp = Instance( Queue, help="a queue for string messages from the server", allow_none=True ) @@ -111,6 +113,9 @@ def stop(self): if self.writer: self.portal.call(self.writer.close) self.writer = None + if self.tcp_con: + self.portal.call(self.tcp_con.aclose) + self.tcp_con = None if self.cancelscope: self.portal.call(self.cancelscope.cancel) @@ -137,27 +142,86 @@ def now(self): async def init_process(self): """start the language server subprocess""" self.substitute_env(self.spec.get("env", {}), os.environ) + + argv = self.spec["argv"] + + host = None + port = None + mode = self.spec.get("mode") + if mode == "tcp": + host, port = self.get_tcp_server() + argv = [arg.format(host=host, port=port) for arg in argv] + self.process = await anyio.open_process( - self.spec["argv"], + argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) + if mode == "tcp": + self.tcp_con = await self.init_tcp_connection(host, port) + def init_queues(self): """create the queues""" self.from_lsp = Queue() self.to_lsp = Queue() + def get_tcp_server(self): + host = self.spec.get("host", "127.0.0.1") + port = self.spec.get("port") + + if not port: + if host in ["127.0.0.1", "localhost"]: + port = get_unused_port() + else: + raise ValueError("A port must be given explicitly for hosts other than localhost") + return (host, port) + + async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): + server = '{}:{}'.format(host, port) + + tries = 0 + while tries < retries: + tries = tries + 1 + try: + return await anyio.connect_tcp(host, port) + except OSError: + if tries < retries: + self.log.warning('Connection to server {} refused! Attempt {}/{}. Retrying in {}s'.format(server, tries, retries, sleep)) + await anyio.sleep(sleep) + else: + self.log.warning('Connection to server {} refused! Attempt {}/{}.'.format(server, tries, retries)) + + raise OSError("Unable to connect to server {}".format(server)) + def init_reader(self): """create the stdout reader (from the language server)""" - self.reader = stdio.LspStdIoReader( - stream=self.process.stdout, queue=self.from_lsp, parent=self + stream = None + mode = self.spec.get("mode", "stdio") + if mode == "tcp": + stream = self.tcp_con + elif mode == "stdio": + stream = self.process.stdout + else: + raise ValueError("Unknown mode: " + mode) + + self.reader = LspStreamReader( + stream=stream, queue=self.from_lsp, parent=self ) def init_writer(self): """create the stdin writer (to the language server)""" - self.writer = stdio.LspStdIoWriter( - stream=self.process.stdin, queue=self.to_lsp, parent=self + stream = None + mode = self.spec.get("mode", "stdio") + if mode == "tcp": + stream = self.tcp_con + elif mode == "stdio": + stream = self.process.stdin + else: + raise ValueError("Unknown mode: " + mode) + + self.writer = LspStreamWriter( + stream=stream, queue=self.to_lsp, parent=self ) def substitute_env(self, env, base): diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py index 8df7e2c57..b7645f4f4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py @@ -4,7 +4,7 @@ import pytest from tornado.queues import Queue -from jupyter_lsp.stdio import LspStdIoReader +from jupyter_lsp.connection import LspStdIoReader WRITER_TEMPLATE = """ from time import sleep diff --git a/python_packages/jupyter_lsp/jupyter_lsp/utils.py b/python_packages/jupyter_lsp/jupyter_lsp/utils.py new file mode 100644 index 000000000..665bf303e --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/utils.py @@ -0,0 +1,16 @@ +""" get a random port +""" +import socket + + +def get_unused_port(): + """Get an unused port by trying to listen to any random port. + + Probably could introduce race conditions if inside a tight loop. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('localhost', 0)) + sock.listen(1) + port = sock.getsockname()[1] + sock.close() + return port From e480fc84f44d9be39756ccc79f3199e23ef6b6a2 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 23 May 2021 15:08:40 +0200 Subject: [PATCH 03/51] Fix error where jupyter would not start after page reload --- .../jupyter_lsp/jupyter_lsp/connection.py | 3 ++- .../jupyter_lsp/jupyter_lsp/session.py | 16 +++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index c7ce543a1..cc7805de7 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -187,7 +187,8 @@ async def _readline(self) -> Text: line = await self.stream.receive_until(b'\r\n', 65536) return line.decode("utf-8").strip() except anyio.IncompleteRead: - return "" + # resource has been closed before the requested bytes could be retrieved -> signal recource closed + raise anyio.ClosedResourceError except anyio.DelimiterNotFound: self.log.error("Readline hit max_bytes before newline character was encountered") return "" diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index f7a387f50..edf6c6ba3 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -104,6 +104,9 @@ def stop(self): self.status = SessionStatus.STOPPING + if self.cancelscope is not None: + self.portal.call(self.cancelscope.cancel) + self.cancelscope = None if self.process: self.process.terminate() self.process = None @@ -117,10 +120,6 @@ def stop(self): self.portal.call(self.tcp_con.aclose) self.tcp_con = None - if self.cancelscope: - self.portal.call(self.cancelscope.cancel) - self.cancelscope = None - self.status = SessionStatus.STOPPED @observe("handlers") @@ -235,11 +234,10 @@ def listen(self): async def _listen(self): try: async with anyio.create_task_group() as tg: - async with anyio.open_cancel_scope() as scope: - self.cancelscope = scope - await tg.spawn(self._read_lsp) - await tg.spawn(self._write_lsp) - await tg.spawn(self._broadcast_from_lsp) + self.cancelscope = tg.cancel_scope + await tg.spawn(self._read_lsp) + await tg.spawn(self._write_lsp) + await tg.spawn(self._broadcast_from_lsp) except Exception as e: self.log.exception("Execption while listening {}", e) From ac7be832567323965cc04fbafe824eb9ee629377 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 5 Jun 2021 16:03:15 +0200 Subject: [PATCH 04/51] Adapt to changes of start_blocking_portal() in anyio3.0 --- .../jupyter_lsp/jupyter_lsp/session.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index edf6c6ba3..105ed33c2 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -5,8 +5,10 @@ import os import string import subprocess +import threading from copy import copy from datetime import datetime, timezone +from typing import cast from concurrent.futures import ThreadPoolExecutor from tornado.concurrent import run_on_executor @@ -64,7 +66,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) atexit.register(self.stop) self.executor = ThreadPoolExecutor(max_workers=1) - self.portal = anyio.start_blocking_portal() + self.start_blocking_portal() def __repr__(self): # pragma: no cover @@ -138,6 +140,21 @@ def write(self, message): def now(self): return datetime.now(timezone.utc) + # old definition of start_blocking_portal() prior to anyio3 + def start_blocking_portal(self): + async def run_portal(): + nonlocal portal + async with anyio.create_blocking_portal() as portal: + event.set() + await portal.sleep_until_stopped() + + portal: Optional[anyio.abc.BlockingPortal] + event = threading.Event() + thread = threading.Thread(target=anyio.run, kwargs={"func": run_portal}) + thread.start() + event.wait() + self.portal = cast(anyio.abc.BlockingPortal, portal) + async def init_process(self): """start the language server subprocess""" self.substitute_env(self.spec.get("env", {}), os.environ) From 9360802e613374fe5f5451a9e6cb9848f6209bee Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 20 Jun 2021 18:13:47 +0200 Subject: [PATCH 05/51] Kill lsp-servers which are not terminating willingly when asked to --- .../jupyter_lsp/jupyter_lsp/session.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 105ed33c2..8fc220f7e 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -110,7 +110,7 @@ def stop(self): self.portal.call(self.cancelscope.cancel) self.cancelscope = None if self.process: - self.process.terminate() + self.portal.call(self.stop_process, 5) self.process = None if self.reader: self.portal.call(self.reader.close) @@ -177,6 +177,21 @@ async def init_process(self): if mode == "tcp": self.tcp_con = await self.init_tcp_connection(host, port) + async def stop_process(self, timeout: int=5): + if self.process is None: + return + + # try to stop the process gracefully + self.process.terminate() + with anyio.move_on_after(timeout) as scope: + self.log.debug("Waiting for process to terminate") + await self.process.wait() + return + + self.log.debug("Process did not terminate within {} seconds. Bringing it down the hard way!".format(timeout)) + self.process.kill() + + def init_queues(self): """create the queues""" self.from_lsp = Queue() From 84afabdf61e095bd7c420b49c84af4355790334a Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Thu, 1 Jul 2021 19:20:24 +0200 Subject: [PATCH 06/51] Code style fixes --- .../jupyter_lsp/jupyter_lsp/connection.py | 42 ++++++++------ .../jupyter_lsp/jupyter_lsp/session.py | 58 +++++++++++-------- .../jupyter_lsp/jupyter_lsp/utils.py | 2 +- 3 files changed, 59 insertions(+), 43 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index cc7805de7..bc1976eac 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -7,16 +7,14 @@ > > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE > > Copyright 2018 Palantir Technologies, Inc. """ -# pylint: disable=broad-except -import anyio -from anyio.streams.buffered import BufferedByteReceiveStream -from anyio.streams.text import TextSendStream -import io import os from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Text -from tornado.concurrent import run_on_executor +# pylint: disable=broad-except +import anyio +from anyio.streams.buffered import BufferedByteReceiveStream +from anyio.streams.text import TextSendStream from tornado.gen import convert_yielded from tornado.httputil import HTTPHeaders from tornado.ioloop import IOLoop @@ -24,10 +22,11 @@ from traitlets import Float, Instance, default from traitlets.config import LoggingConfigurable -from .non_blocking import make_non_blocking class LspStreamBase(LoggingConfigurable): - """Non-blocking, queued base for communicating with Language Servers through anyio streams""" + """Non-blocking, queued base for communicating with Language Servers through anyio + streams + """ executor = None @@ -74,7 +73,7 @@ async def sleep(self): return self.next_wait = min(self.next_wait * 2, self.max_wait) try: - await asyncio.sleep(self.next_wait) + await anyio.sleep(self.next_wait) except Exception: # pragma: no cover pass @@ -180,25 +179,29 @@ async def read_one(self) -> Text: async def _readline(self) -> Text: """Read a line (or immediately return None)""" try: - # use same max_bytes as is default for receive for now. It seems there is no way of getting - # the bytes read until max_bytes is reached, so we cannot iterate the receive_until call - # with smaller max_bytes values - async with anyio.move_on_after(0.2) as moa: - line = await self.stream.receive_until(b'\r\n', 65536) + # use same max_bytes as is default for receive for now. It seems there is no + # way of getting the bytes read until max_bytes is reached, so we cannot + # iterate the receive_until call with smaller max_bytes values + async with anyio.move_on_after(0.2): + line = await self.stream.receive_until(b"\r\n", 65536) return line.decode("utf-8").strip() except anyio.IncompleteRead: - # resource has been closed before the requested bytes could be retrieved -> signal recource closed + # resource has been closed before the requested bytes could be retrieved + # -> signal recource closed raise anyio.ClosedResourceError except anyio.DelimiterNotFound: - self.log.error("Readline hit max_bytes before newline character was encountered") + self.log.error( + "Readline hit max_bytes before newline character was encountered" + ) return "" + class LspStreamWriter(LspStreamBase): """Language Server Writer""" def __init__(self, **kwargs): super().__init__(**kwargs) - self.stream = TextSendStream(self.stream, encoding='utf-8') + self.stream = TextSendStream(self.stream, encoding="utf-8") async def write(self) -> None: """Write to a Language Server until it closes""" @@ -208,7 +211,10 @@ async def write(self) -> None: nBytes = len(message.encode("utf-8")) response = "Content-Length: {}\r\n\r\n{}".format(nBytes, message) await convert_yielded(self._write_one(response)) - except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover + except ( + anyio.ClosedResourceError, + anyio.BrokenResourceError, + ): # pragma: no cover # stream was closed -> terminate break except Exception: diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 8fc220f7e..a82b88fbf 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -1,16 +1,15 @@ """ A session for managing a language server process """ -import anyio import atexit import os import string import subprocess import threading -from copy import copy +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone -from typing import cast +from typing import Optional, cast -from concurrent.futures import ThreadPoolExecutor +import anyio from tornado.concurrent import run_on_executor from tornado.ioloop import IOLoop from tornado.queues import Queue @@ -18,7 +17,7 @@ from traitlets import Bunch, Instance, Set, Unicode, UseEnum, observe from traitlets.config import LoggingConfigurable -from .connection import LspStreamWriter, LspStreamReader +from .connection import LspStreamReader, LspStreamWriter from .schema import LANGUAGE_SERVER_SPEC from .trait_types import Schema from .types import SessionStatus @@ -43,7 +42,9 @@ class LanguageServerSession(LoggingConfigurable): cancelscope = None writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) - tcp_con = Instance(anyio.abc.SocketStream, help="the tcp connection", allow_none=True) + tcp_con = Instance( + anyio.abc.SocketStream, help="the tcp connection", allow_none=True + ) from_lsp = Instance( Queue, help="a queue for string messages from the server", allow_none=True ) @@ -68,7 +69,6 @@ def __init__(self, *args, **kwargs): self.executor = ThreadPoolExecutor(max_workers=1) self.start_blocking_portal() - def __repr__(self): # pragma: no cover return ( "" @@ -169,29 +169,31 @@ async def init_process(self): argv = [arg.format(host=host, port=port) for arg in argv] self.process = await anyio.open_process( - argv, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE + argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) if mode == "tcp": self.tcp_con = await self.init_tcp_connection(host, port) - async def stop_process(self, timeout: int=5): + async def stop_process(self, timeout: int = 5): if self.process is None: return # try to stop the process gracefully self.process.terminate() - with anyio.move_on_after(timeout) as scope: + with anyio.move_on_after(timeout): self.log.debug("Waiting for process to terminate") await self.process.wait() return - self.log.debug("Process did not terminate within {} seconds. Bringing it down the hard way!".format(timeout)) + self.log.debug( + ( + "Process did not terminate within {} seconds. " + "Bringing it down the hard way!" + ).format(timeout) + ) self.process.kill() - def init_queues(self): """create the queues""" self.from_lsp = Queue() @@ -205,11 +207,13 @@ def get_tcp_server(self): if host in ["127.0.0.1", "localhost"]: port = get_unused_port() else: - raise ValueError("A port must be given explicitly for hosts other than localhost") + raise ValueError( + "A port must be given explicitly for hosts other than localhost" + ) return (host, port) async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): - server = '{}:{}'.format(host, port) + server = "{}:{}".format(host, port) tries = 0 while tries < retries: @@ -218,10 +222,20 @@ async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): return await anyio.connect_tcp(host, port) except OSError: if tries < retries: - self.log.warning('Connection to server {} refused! Attempt {}/{}. Retrying in {}s'.format(server, tries, retries, sleep)) + self.log.warning( + ( + "Connection to server {} refused! " + "Attempt {}/{}. " + "Retrying in {}s" + ).format(server, tries, retries, sleep) + ) await anyio.sleep(sleep) else: - self.log.warning('Connection to server {} refused! Attempt {}/{}.'.format(server, tries, retries)) + self.log.warning( + "Connection to server {} refused! Attempt {}/{}.".format( + server, tries, retries + ) + ) raise OSError("Unable to connect to server {}".format(server)) @@ -236,9 +250,7 @@ def init_reader(self): else: raise ValueError("Unknown mode: " + mode) - self.reader = LspStreamReader( - stream=stream, queue=self.from_lsp, parent=self - ) + self.reader = LspStreamReader(stream=stream, queue=self.from_lsp, parent=self) def init_writer(self): """create the stdin writer (to the language server)""" @@ -251,9 +263,7 @@ def init_writer(self): else: raise ValueError("Unknown mode: " + mode) - self.writer = LspStreamWriter( - stream=stream, queue=self.to_lsp, parent=self - ) + self.writer = LspStreamWriter(stream=stream, queue=self.to_lsp, parent=self) def substitute_env(self, env, base): for key, value in env.items(): diff --git a/python_packages/jupyter_lsp/jupyter_lsp/utils.py b/python_packages/jupyter_lsp/jupyter_lsp/utils.py index 665bf303e..439227b6d 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/utils.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/utils.py @@ -9,7 +9,7 @@ def get_unused_port(): Probably could introduce race conditions if inside a tight loop. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('localhost', 0)) + sock.bind(("localhost", 0)) sock.listen(1) port = sock.getsockname()[1] sock.close() From 5174669ae23871e84797bba8e84a710538b2703f Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 3 Jul 2021 19:15:36 +0200 Subject: [PATCH 07/51] Make it work for LSPs running in own process on localhost --- .../jupyter_lsp/jupyter_lsp/session.py | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index a82b88fbf..bf673d6b7 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -119,6 +119,7 @@ def stop(self): self.portal.call(self.writer.close) self.writer = None if self.tcp_con: + self.log.warning("Closing TCP connection") self.portal.call(self.tcp_con.aclose) self.tcp_con = None @@ -165,13 +166,22 @@ async def init_process(self): port = None mode = self.spec.get("mode") if mode == "tcp": - host, port = self.get_tcp_server() + host, port, ext = self.get_tcp_server() + # if server is already running in different process + if ext: + self.log.warning("Opening TCP connection to external process") + # just connect to it and be done + self.tcp_con = await self.init_tcp_connection(host, port) + return + # else substitute arguments for host and port into the environment argv = [arg.format(host=host, port=port) for arg in argv] + # and start the process self.process = await anyio.open_process( argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) + # finally connect to the now running process if in tcp mode if mode == "tcp": self.tcp_con = await self.init_tcp_connection(host, port) @@ -200,17 +210,32 @@ def init_queues(self): self.to_lsp = Queue() def get_tcp_server(self): - host = self.spec.get("host", "127.0.0.1") + """ Reads the TCP configuration parameters from the specification + + Returns a triple (host, port, ext), where ext is a boolean specifying whether + the sever is running externaly. + If neither a host nor a port is specified a randomly selected free port and + host="127.0.0.1" will be returned. + If a host but no port is specified a ValueError is raised. + In all other cases the specified port and host will be returned. + """ + host = self.spec.get("host") port = self.spec.get("port") + ext = True - if not port: - if host in ["127.0.0.1", "localhost"]: + if port is None: + if host is None: + host = "127.0.0.1" port = get_unused_port() + ext = False else: raise ValueError( - "A port must be given explicitly for hosts other than localhost" + "A port must be given explicitly if a host was specified" ) - return (host, port) + else: + if host is None: + host = "127.0.0.1" + return (host, port, ext) async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): server = "{}:{}".format(host, port) From 880f0f686c4540106fb412358c404bae787b7770 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 4 Jul 2021 17:09:41 +0200 Subject: [PATCH 08/51] Split Session in separate classes for TCP and Stdio --- .../jupyter_lsp/jupyter_lsp/manager.py | 28 ++- .../jupyter_lsp/jupyter_lsp/session.py | 202 ++++++++++-------- 2 files changed, 136 insertions(+), 94 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/manager.py b/python_packages/jupyter_lsp/jupyter_lsp/manager.py index 7eaff33e9..a8f79cf2a 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/manager.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/manager.py @@ -22,7 +22,11 @@ EP_SPEC_V1, ) from .schema import LANGUAGE_SERVER_SPEC_MAP -from .session import LanguageServerSession +from .session import ( + LanguageServerSessionBase, + LanguageServerSessionStdio, + LanguageServerSessionTCP, +) from .trait_types import LoadableCallable, Schema from .types import ( KeyedLanguageServerSpecs, @@ -54,10 +58,10 @@ class LanguageServerManager(LanguageServerManagerAPI): ) # type: bool sessions = Dict_( - trait=Instance(LanguageServerSession), + trait=Instance(LanguageServerSessionBase), default_value={}, help="sessions keyed by language server name", - ) # type: Dict[Tuple[Text], LanguageServerSession] + ) # type: Dict[Tuple[Text], LanguageServerSessionBase] virtual_documents_dir = Unicode( help="""Path to virtual documents relative to the content manager root @@ -127,9 +131,21 @@ def init_sessions(self): """create, but do not initialize all sessions""" sessions = {} for language_server, spec in self.language_servers.items(): - sessions[language_server] = LanguageServerSession( - language_server=language_server, spec=spec, parent=self - ) + mode = spec.get("mode", "stdio") + if mode == "stdio": + sessions[language_server] = LanguageServerSessionStdio( + language_server=language_server, spec=spec, parent=self + ) + elif mode == "tcp": + sessions[language_server] = LanguageServerSessionTCP( + language_server=language_server, spec=spec, parent=self + ) + else: + raise ValueError( + "Unknown session mode {} for language server '{}'".format( + mode, language_server + ) + ) self.sessions = sessions def init_listeners(self): diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index bf673d6b7..29f3af577 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -7,7 +7,7 @@ import threading from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone -from typing import Optional, cast +from typing import List, Optional, cast import anyio from tornado.concurrent import run_on_executor @@ -27,7 +27,7 @@ SKIP_JSON_SPEC = ["argv", "debug_argv", "env"] -class LanguageServerSession(LoggingConfigurable): +class LanguageServerSessionBase(LoggingConfigurable): """Manage a session for a connection to a language server""" language_server = Unicode(help="the language server implementation name") @@ -42,9 +42,6 @@ class LanguageServerSession(LoggingConfigurable): cancelscope = None writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) - tcp_con = Instance( - anyio.abc.SocketStream, help="the tcp connection", allow_none=True - ) from_lsp = Instance( Queue, help="a queue for string messages from the server", allow_none=True ) @@ -109,19 +106,15 @@ def stop(self): if self.cancelscope is not None: self.portal.call(self.cancelscope.cancel) self.cancelscope = None - if self.process: - self.portal.call(self.stop_process, 5) - self.process = None if self.reader: self.portal.call(self.reader.close) self.reader = None if self.writer: self.portal.call(self.writer.close) self.writer = None - if self.tcp_con: - self.log.warning("Closing TCP connection") - self.portal.call(self.tcp_con.aclose) - self.tcp_con = None + if self.process: + self.portal.call(self.stop_process, 5) + self.process = None self.status = SessionStatus.STOPPED @@ -158,34 +151,25 @@ async def run_portal(): async def init_process(self): """start the language server subprocess""" - self.substitute_env(self.spec.get("env", {}), os.environ) + # must be implemented by the base classes + pass - argv = self.spec["argv"] + async def start_process(self, argv: List[str]): + """start the language server subprocess giben in argv""" - host = None - port = None - mode = self.spec.get("mode") - if mode == "tcp": - host, port, ext = self.get_tcp_server() - # if server is already running in different process - if ext: - self.log.warning("Opening TCP connection to external process") - # just connect to it and be done - self.tcp_con = await self.init_tcp_connection(host, port) - return - # else substitute arguments for host and port into the environment - argv = [arg.format(host=host, port=port) for arg in argv] + self.substitute_env(self.spec.get("env", {}), os.environ) # and start the process self.process = await anyio.open_process( argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) - # finally connect to the now running process if in tcp mode - if mode == "tcp": - self.tcp_con = await self.init_tcp_connection(host, port) - async def stop_process(self, timeout: int = 5): + """stop the language server subprocess + + If the process does not terminate within timeout seconds it will be killed + forcefully. + """ if self.process is None: return @@ -209,8 +193,100 @@ def init_queues(self): self.from_lsp = Queue() self.to_lsp = Queue() + def init_reader(self): + """create the stream reader (from the language server)""" + # must be implemented by the base classes + pass + + def init_writer(self): + """create the stream writer (to the language server)""" + # must be implemented by the base classes + pass + + def substitute_env(self, env, base): + for key, value in env.items(): + os.environ.update({key: string.Template(value).safe_substitute(base)}) + + @run_on_executor + def listen(self): + self.portal.call(self._listen) + + async def _listen(self): + try: + async with anyio.create_task_group() as tg: + self.cancelscope = tg.cancel_scope + await tg.spawn(self._read_lsp) + await tg.spawn(self._write_lsp) + await tg.spawn(self._broadcast_from_lsp) + except Exception as e: + self.log.exception("Execption while listening {}", e) + + async def _read_lsp(self): + await self.reader.read() + + async def _write_lsp(self): + await self.writer.write() + + async def _broadcast_from_lsp(self): + """loop for reading messages from the queue of messages from the language + server + """ + async for message in self.from_lsp: + self.last_server_message_at = self.now() + await self.parent.on_server_message(message, self) + self.from_lsp.task_done() + + +class LanguageServerSessionStdio(LanguageServerSessionBase): + async def init_process(self): + await self.start_process(self.spec["argv"]) + + def init_reader(self): + self.reader = LspStreamReader( + stream=self.process.stdout, queue=self.from_lsp, parent=self + ) + + def init_writer(self): + self.writer = LspStreamWriter( + stream=self.process.stdin, queue=self.to_lsp, parent=self + ) + + +class LanguageServerSessionTCP(LanguageServerSessionBase): + + tcp_con = Instance( + anyio.abc.SocketStream, help="the tcp connection", allow_none=True + ) + + async def init_process(self): + """start the language server subprocess""" + argv = self.spec["argv"] + + host, port, ext = self.get_tcp_server() + # if server is already running in different process + if ext: + self.log.warning("Opening TCP connection to external process") + # just connect to it and be done + self.tcp_con = await self.init_tcp_connection(host, port) + return + + # else substitute arguments for host and port into the environment + argv = [arg.format(host=host, port=port) for arg in argv] + + # and start the process + await self.start_process(argv) + + # finally connect to the now running process if in tcp mode + self.tcp_con = await self.init_tcp_connection(host, port) + + async def stop_process(self, timeout: int = 5): + await self.tcp_con.aclose() + self.tcp_con = None + + await super().stop_process(timeout) + def get_tcp_server(self): - """ Reads the TCP configuration parameters from the specification + """Reads the TCP configuration parameters from the specification Returns a triple (host, port, ext), where ext is a boolean specifying whether the sever is running externaly. @@ -261,64 +337,14 @@ async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): server, tries, retries ) ) - raise OSError("Unable to connect to server {}".format(server)) def init_reader(self): - """create the stdout reader (from the language server)""" - stream = None - mode = self.spec.get("mode", "stdio") - if mode == "tcp": - stream = self.tcp_con - elif mode == "stdio": - stream = self.process.stdout - else: - raise ValueError("Unknown mode: " + mode) - - self.reader = LspStreamReader(stream=stream, queue=self.from_lsp, parent=self) + self.reader = LspStreamReader( + stream=self.tcp_con, queue=self.from_lsp, parent=self + ) def init_writer(self): - """create the stdin writer (to the language server)""" - stream = None - mode = self.spec.get("mode", "stdio") - if mode == "tcp": - stream = self.tcp_con - elif mode == "stdio": - stream = self.process.stdin - else: - raise ValueError("Unknown mode: " + mode) - - self.writer = LspStreamWriter(stream=stream, queue=self.to_lsp, parent=self) - - def substitute_env(self, env, base): - for key, value in env.items(): - os.environ.update({key: string.Template(value).safe_substitute(base)}) - - @run_on_executor - def listen(self): - self.portal.call(self._listen) - - async def _listen(self): - try: - async with anyio.create_task_group() as tg: - self.cancelscope = tg.cancel_scope - await tg.spawn(self._read_lsp) - await tg.spawn(self._write_lsp) - await tg.spawn(self._broadcast_from_lsp) - except Exception as e: - self.log.exception("Execption while listening {}", e) - - async def _read_lsp(self): - await self.reader.read() - - async def _write_lsp(self): - await self.writer.write() - - async def _broadcast_from_lsp(self): - """loop for reading messages from the queue of messages from the language - server - """ - async for message in self.from_lsp: - self.last_server_message_at = self.now() - await self.parent.on_server_message(message, self) - self.from_lsp.task_done() + self.writer = LspStreamWriter( + stream=self.tcp_con, queue=self.to_lsp, parent=self + ) From e36f3ac6fd1719af4da840b422646f22f9a7cd22 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Mon, 5 Jul 2021 19:44:38 +0200 Subject: [PATCH 09/51] Move stream from LspStreamBase to LspStreamReader and LspStreamWriter --- .../jupyter_lsp/jupyter_lsp/connection.py | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index bc1976eac..35b12f4d4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -30,9 +30,6 @@ class LspStreamBase(LoggingConfigurable): executor = None - stream = Instance( - anyio.abc.AsyncResource, help="the stream to read/write" - ) # type: anyio.abc.AsyncResource queue = Instance(Queue, help="queue to get/put") def __repr__(self): # pragma: no cover @@ -44,8 +41,8 @@ def __init__(self, **kwargs): self.executor = ThreadPoolExecutor(max_workers=1) async def close(self): - await self.stream.aclose() - self.log.debug("%s closed", self) + # must be implemented by the base classes + pass class LspStreamReader(LspStreamBase): @@ -59,9 +56,17 @@ class LspStreamReader(LspStreamBase): min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True) next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True) - def __init__(self, **kwargs): + stream = Instance( + BufferedByteReceiveStream, help="the stream to read from" + ) # type: BufferedByteReceiveStream + + def __init__(self, stream: anyio.abc.AsyncResource, **kwargs): super().__init__(**kwargs) - self.stream = BufferedByteReceiveStream(self.stream) + self.stream = BufferedByteReceiveStream(stream) + + async def close(self): + await self.stream.aclose() + self.log.debug("%s closed", self) @default("max_wait") def _default_max_wait(self): @@ -199,17 +204,25 @@ async def _readline(self) -> Text: class LspStreamWriter(LspStreamBase): """Language Server Writer""" - def __init__(self, **kwargs): + stream = Instance( + TextSendStream, help="the stream to write to" + ) # type: TextSendStream + + def __init__(self, stream: anyio.abc.AsyncResource, **kwargs): super().__init__(**kwargs) - self.stream = TextSendStream(self.stream, encoding="utf-8") + self.stream = TextSendStream(stream, encoding="utf-8") + + async def close(self): + await self.stream.aclose() + self.log.debug("%s closed", self) async def write(self) -> None: """Write to a Language Server until it closes""" while True: message = await self.queue.get() try: - nBytes = len(message.encode("utf-8")) - response = "Content-Length: {}\r\n\r\n{}".format(nBytes, message) + n_bytes = len(message.encode("utf-8")) + response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message) await convert_yielded(self._write_one(response)) except ( anyio.ClosedResourceError, From 7085747303840e1272a93d022940783d890588db Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Thu, 8 Jul 2021 18:33:56 +0200 Subject: [PATCH 10/51] Fix unit tests (switched to anyio) --- .../jupyter_lsp/jupyter_lsp/tests/conftest.py | 6 +++ .../jupyter_lsp/tests/test_bad_spec.py | 4 +- .../jupyter_lsp/tests/test_stdio.py | 39 +++++++++++-------- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py index eeb004cd9..aa52f36b4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py @@ -109,6 +109,12 @@ def jsonrpc_init_msg(): ) +# only run tests with asyncio +@fixture +def anyio_backend(): + return "asyncio" + + @fixture def app(): return MockServerApp() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py index 528be4034..d6be3d03e 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py @@ -1,7 +1,7 @@ import pytest import traitlets -from jupyter_lsp.session import LanguageServerSession +from jupyter_lsp.session import LanguageServerSessionStdio @pytest.mark.parametrize( @@ -16,4 +16,4 @@ ) def test_bad_spec(spec): with pytest.raises(traitlets.TraitError): - LanguageServerSession(spec=spec) + LanguageServerSessionStdio(spec=spec) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py index b7645f4f4..ecf3c2cc8 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py @@ -1,16 +1,18 @@ -import asyncio +import anyio import subprocess import pytest from tornado.queues import Queue -from jupyter_lsp.connection import LspStdIoReader +from jupyter_lsp.connection import LspStreamReader WRITER_TEMPLATE = """ from time import sleep -print('Content-Length: {length}') -print() +# the LSP states that each header field must be terminated by \\r\\n +print('Content-Length: {length}', end='\\r\\n') +# and the header must be terminated again by \\r\\n +print(end='\\r\\n') for repeat in range({repeats}): sleep({interval}) @@ -27,7 +29,7 @@ class CommunicatorSpawner: def __init__(self, tmp_path): self.tmp_path = tmp_path - def spawn_writer( + async def spawn_writer( self, message: str, repeats: int = 1, interval=None, add_excess=False ): length = len(message) * repeats @@ -41,8 +43,8 @@ def spawn_writer( add_excess=add_excess, ) ) - return subprocess.Popen( - ["python", "-u", str(commands_file)], stdout=subprocess.PIPE, bufsize=0 + return await anyio.open_process( + ["python", "-u", str(commands_file)], stdout=subprocess.PIPE ) @@ -51,11 +53,14 @@ def communicator_spawner(tmp_path): return CommunicatorSpawner(tmp_path) -async def join_process(process: subprocess.Popen, headstart=1, timeout=1): - await asyncio.sleep(headstart) - result = process.wait(timeout=timeout) +async def join_process(process: anyio.abc.Process, headstart=1, timeout=1): + await anyio.sleep(headstart) + # wait for timeout second for the process to terminate before raising a TimeoutError + with anyio.fail_after(timeout): + result = await process.wait() + # close any streams attached to stdout if process.stdout: - process.stdout.close() + await process.stdout.aclose() return result @@ -63,23 +68,25 @@ async def join_process(process: subprocess.Popen, headstart=1, timeout=1): "message,repeats,interval,add_excess", [ ["short", 1, None, False], - ["ab" * 10_0000, 1, None, False], + ["ab" * 100_000, 1, None, False], ["ab", 2, 0.01, False], ["ab", 45, 0.01, False], ["message", 2, 0.01, True], ], ids=["short", "long", "intermittent", "intensive-intermittent", "with-excess"], ) -@pytest.mark.asyncio +@pytest.mark.anyio async def test_reader(message, repeats, interval, add_excess, communicator_spawner): queue = Queue() - process = communicator_spawner.spawn_writer( + process = await communicator_spawner.spawn_writer( message=message, repeats=repeats, interval=interval, add_excess=add_excess ) - reader = LspStdIoReader(stream=process.stdout, queue=queue) + reader = LspStreamReader(stream=process.stdout, queue=queue) - await asyncio.gather(join_process(process, headstart=3, timeout=1), reader.read()) + async with anyio.create_task_group() as tg: + tg.start_soon(join_process, process, 3, 1) + tg.start_soon(reader.read) result = queue.get_nowait() assert result == message * repeats From 22ae93f5c816a85d772e12d448afcde47d8c825b Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Thu, 8 Jul 2021 19:26:56 +0200 Subject: [PATCH 11/51] Remove code related to externally running servers for now --- .../jupyter_lsp/schema/schema.json | 37 +++------------- .../jupyter_lsp/jupyter_lsp/session.py | 43 +++---------------- 2 files changed, 12 insertions(+), 68 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json index 78fe525e4..f608ab289 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json +++ b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json @@ -139,36 +139,13 @@ "description": "list of MIME types supported by the language server", "title": "MIME Types" }, - "mode": { - "description": "connection mode used, e.g. stdio (default), tcp", - "title": "Mode", - "type": "string", - "enum": ["stdio", "tcp"] - }, - "port": { - "description": "the port for tcp mode connections. a null value will select a random, unused port", - "title": "Port", - "oneOf": [ - { - "type": "integer" - }, - { - "type": "null" - } - ] - }, - "host": { - "description": "the host for tcp mode connections. a null value will assume '127.0.0.1'", - "title": "Host", - "oneOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ] - }, + "mode": { + "description": "connection mode used, e.g. stdio (default), tcp", + "title": "Mode", + "type": "string", + "enum": ["stdio", "tcp"], + "default": "stdio" + }, "urls": { "additionalProperties": { "format": "uri", diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 29f3af577..3a63ae80b 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -262,21 +262,16 @@ async def init_process(self): """start the language server subprocess""" argv = self.spec["argv"] - host, port, ext = self.get_tcp_server() - # if server is already running in different process - if ext: - self.log.warning("Opening TCP connection to external process") - # just connect to it and be done - self.tcp_con = await self.init_tcp_connection(host, port) - return + host = "127.0.0.1" + port = get_unused_port() - # else substitute arguments for host and port into the environment + # substitute arguments for host and port into the environment argv = [arg.format(host=host, port=port) for arg in argv] - # and start the process + # start the process await self.start_process(argv) - # finally connect to the now running process if in tcp mode + # finally open the tcp connection to the now running process self.tcp_con = await self.init_tcp_connection(host, port) async def stop_process(self, timeout: int = 5): @@ -285,34 +280,6 @@ async def stop_process(self, timeout: int = 5): await super().stop_process(timeout) - def get_tcp_server(self): - """Reads the TCP configuration parameters from the specification - - Returns a triple (host, port, ext), where ext is a boolean specifying whether - the sever is running externaly. - If neither a host nor a port is specified a randomly selected free port and - host="127.0.0.1" will be returned. - If a host but no port is specified a ValueError is raised. - In all other cases the specified port and host will be returned. - """ - host = self.spec.get("host") - port = self.spec.get("port") - ext = True - - if port is None: - if host is None: - host = "127.0.0.1" - port = get_unused_port() - ext = False - else: - raise ValueError( - "A port must be given explicitly if a host was specified" - ) - else: - if host is None: - host = "127.0.0.1" - return (host, port, ext) - async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): server = "{}:{}".format(host, port) From 62dc9923f2c82cbd5988535e9560aeb6d7622486 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Thu, 8 Jul 2021 19:31:32 +0200 Subject: [PATCH 12/51] Extend docs for extending language servers with different modes --- CONTRIBUTING.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 0aedf299d..4e0a8d1da 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -384,8 +384,10 @@ otherwise an empty dictionary (`{}`) should be returned. ##### Common Concerns - some language servers need to have their connection mode specified - - the `stdio` interface is the only one supported by `jupyter_lsp` - - PRs welcome to support other modes! + - `jupyter_lsp` currently supports the `stdio` and `tcp` interface + - the mode used by `jupyter_lsp` to connect to the language server can be specified by including `mode="stdio"` or `mode="tcp"` in the language sever `spec`-dictionary + - currently it is not possible to connect externally running language servers via tcp, but only to servers spawned by `jupyter_lsp` as given by the `argv` specs entry + - PRs welcome to support externally running language servers! - because of its VSCode heritage, many language servers use `nodejs` - `LanguageServerManager.nodejs` will provide the location of our best guess at where a user's `nodejs` might be found From e0ef174dbe474a700a88048d76e7270c7b963cbf Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 9 Jul 2021 18:25:09 +0200 Subject: [PATCH 13/51] Fix codestyle --- python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py index ecf3c2cc8..9f261bb85 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py @@ -1,6 +1,6 @@ -import anyio import subprocess +import anyio import pytest from tornado.queues import Queue From a5be8fb3d9903ba24a6e9c6c27678840067accd8 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 9 Jul 2021 19:03:21 +0200 Subject: [PATCH 14/51] Make maximum bytes for receive configurable --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 35b12f4d4..8e2c59e58 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -19,7 +19,7 @@ from tornado.httputil import HTTPHeaders from tornado.ioloop import IOLoop from tornado.queues import Queue -from traitlets import Float, Instance, default +from traitlets import Float, Instance, Int, default from traitlets.config import LoggingConfigurable @@ -55,6 +55,10 @@ class LspStreamReader(LspStreamBase): max_wait = Float(help="maximum time to wait on idle stream").tag(config=True) min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True) next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True) + receive_max_bytes = Int( + 65536, + help="the maximum size a header line send by the language server may have", + ).tag(config=True) stream = Instance( BufferedByteReceiveStream, help="the stream to read from" @@ -188,7 +192,7 @@ async def _readline(self) -> Text: # way of getting the bytes read until max_bytes is reached, so we cannot # iterate the receive_until call with smaller max_bytes values async with anyio.move_on_after(0.2): - line = await self.stream.receive_until(b"\r\n", 65536) + line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes) return line.decode("utf-8").strip() except anyio.IncompleteRead: # resource has been closed before the requested bytes could be retrieved From aacc847409905863bd9284622ab2917ed21431b2 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 9 Jul 2021 19:04:33 +0200 Subject: [PATCH 15/51] Fix spelling in doc --- CONTRIBUTING.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ae820625f..15ca28e44 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -387,8 +387,8 @@ otherwise an empty dictionary (`{}`) should be returned. - some language servers need to have their connection mode specified - `jupyter_lsp` currently supports the `stdio` and `tcp` interface - - the mode used by `jupyter_lsp` to connect to the language server can be specified by including `mode="stdio"` or `mode="tcp"` in the language sever `spec`-dictionary - - currently it is not possible to connect externally running language servers via tcp, but only to servers spawned by `jupyter_lsp` as given by the `argv` specs entry + - the mode used by `jupyter_lsp` to connect to the language server can be specified by including `mode="stdio"` or `mode="tcp"` in the language server `spec`-dictionary + - currently it is not possible to connect to externally running language servers via tcp, but only to servers spawned by `jupyter_lsp` as given by the `argv` specs entry - PRs welcome to support externally running language servers! - because of its VSCode heritage, many language servers use `nodejs` - `LanguageServerManager.nodejs` will provide the location of our best From 792ac733ece7721f5300d1a64ce85c851e17b5e3 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 11 Jul 2021 14:57:33 +0200 Subject: [PATCH 16/51] Enforce interfaces by making base classes for Session and Stream abstract --- .../jupyter_lsp/jupyter_lsp/connection.py | 10 ++++- .../jupyter_lsp/jupyter_lsp/session.py | 40 ++++++++++++------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 8e2c59e58..19f27c5ab 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -8,6 +8,7 @@ > > Copyright 2018 Palantir Technologies, Inc. """ import os +from abc import ABC, ABCMeta, abstractmethod from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Text @@ -21,9 +22,14 @@ from tornado.queues import Queue from traitlets import Float, Instance, Int, default from traitlets.config import LoggingConfigurable +from traitlets.traitlets import MetaHasTraits -class LspStreamBase(LoggingConfigurable): +class LspStreamMeta(MetaHasTraits, ABCMeta): + pass + + +class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta): """Non-blocking, queued base for communicating with Language Servers through anyio streams """ @@ -40,8 +46,8 @@ def __init__(self, **kwargs): self.log.debug("%s initialized", self) self.executor = ThreadPoolExecutor(max_workers=1) + @abstractmethod async def close(self): - # must be implemented by the base classes pass diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 6dbcda8cb..c62438717 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -5,6 +5,7 @@ import string import subprocess import threading +from abc import ABC, ABCMeta, abstractmethod from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from typing import List, Optional, cast @@ -16,6 +17,7 @@ from tornado.websocket import WebSocketHandler from traitlets import Bunch, Instance, Set, Unicode, UseEnum, observe from traitlets.config import LoggingConfigurable +from traitlets.traitlets import MetaHasTraits from .connection import LspStreamReader, LspStreamWriter from .schema import LANGUAGE_SERVER_SPEC @@ -25,7 +27,13 @@ from .utils import get_unused_port -class LanguageServerSessionBase(LoggingConfigurable): +class LanguageServerSessionMeta(MetaHasTraits, ABCMeta): + pass + + +class LanguageServerSessionBase( + LoggingConfigurable, ABC, metaclass=LanguageServerSessionMeta +): """Manage a session for a connection to a language server""" language_server = Unicode(help="the language server implementation name") @@ -147,11 +155,6 @@ async def run_portal(): event.wait() self.portal = cast(anyio.abc.BlockingPortal, portal) - async def init_process(self): - """start the language server subprocess""" - # must be implemented by the base classes - pass - async def start_process(self, argv: List[str]): """start the language server subprocess giben in argv""" @@ -191,20 +194,29 @@ def init_queues(self): self.from_lsp = Queue() self.to_lsp = Queue() + def substitute_env(self, env, base): + for key, value in env.items(): + os.environ.update({key: string.Template(value).safe_substitute(base)}) + + @abstractmethod + async def init_process(self): + """start the language server subprocess and store it in self.process""" + pass + + @abstractmethod def init_reader(self): - """create the stream reader (from the language server)""" - # must be implemented by the base classes + """create the stream reader (from the language server) and store it in + self.reader + """ pass + @abstractmethod def init_writer(self): - """create the stream writer (to the language server)""" - # must be implemented by the base classes + """create the stream writer (to the language server) and store it in + self.writer + """ pass - def substitute_env(self, env, base): - for key, value in env.items(): - os.environ.update({key: string.Template(value).safe_substitute(base)}) - @run_on_executor def listen(self): self.portal.call(self._listen) From f41d5fecdfdee8d6f43062ebb9fb7c5297378725 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 16 Jul 2021 17:59:30 +0200 Subject: [PATCH 17/51] Add unit test for reading over tcp --- .../tests/{test_stdio.py => test_reader.py} | 75 +++++++++++++++++-- 1 file changed, 68 insertions(+), 7 deletions(-) rename python_packages/jupyter_lsp/jupyter_lsp/tests/{test_stdio.py => test_reader.py} (52%) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py similarity index 52% rename from python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py rename to python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py index 9f261bb85..79ce67f83 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py @@ -5,6 +5,7 @@ from tornado.queues import Queue from jupyter_lsp.connection import LspStreamReader +from jupyter_lsp.utils import get_unused_port WRITER_TEMPLATE = """ from time import sleep @@ -24,23 +25,63 @@ print() """ +TCP_WRITER_TEMPLATE = """ +from anyio import create_tcp_listener, create_task_group, run, sleep, Event + +async def serve_once(listener): + async def handle(client): + async with client: + # the LSP states that each header field must be terminated by \\r\\n + await client.send(b'Content-Length: {length}\\r\\n') + # and the header must be terminated again by \\r\\n + await client.send(b'\\r\\n') + + for repeat in range({repeats}): + await sleep({interval}) + await client.send(b'{message}') + + if {add_excess}: + await client.send(b"extra") + + await client.send(b'\\n') + + stop.set() + + async def cancel_on_event(scope): + await stop.wait() + scope.cancel() + + stop = Event() + async with create_task_group() as tg: + tg.start_soon(listener.serve, handle) + tg.start_soon(cancel_on_event, tg.cancel_scope) + +async def main(): + listener = await create_tcp_listener(local_port={port}) + await serve_once(listener) + +run(main) +""" + class CommunicatorSpawner: def __init__(self, tmp_path): self.tmp_path = tmp_path async def spawn_writer( - self, message: str, repeats: int = 1, interval=None, add_excess=False + self, message: str, repeats: int = 1, interval=None, add_excess=False, port=None ): + template = WRITER_TEMPLATE if port is None else TCP_WRITER_TEMPLATE length = len(message) * repeats commands_file = self.tmp_path / "writer.py" commands_file.write_text( - WRITER_TEMPLATE.format( + template.format( length=length, repeats=repeats, interval=interval or 0, message=message, add_excess=add_excess, + port=port ) ) return await anyio.open_process( @@ -71,22 +112,42 @@ async def join_process(process: anyio.abc.Process, headstart=1, timeout=1): ["ab" * 100_000, 1, None, False], ["ab", 2, 0.01, False], ["ab", 45, 0.01, False], - ["message", 2, 0.01, True], + ["message", 2, 0.01, True] ], - ids=["short", "long", "intermittent", "intensive-intermittent", "with-excess"], + ids=["short", + "long", + "intermittent", + "intensive-intermittent", + "with-excess", + ], ) +@pytest.mark.parametrize("mode", ["stdio", "tcp"], ids=["stdio", "tcp"]) @pytest.mark.anyio -async def test_reader(message, repeats, interval, add_excess, communicator_spawner): +async def test_reader(message, repeats, interval, add_excess, mode, + communicator_spawner): queue = Queue() + port = get_unused_port() if mode == "tcp" else None process = await communicator_spawner.spawn_writer( - message=message, repeats=repeats, interval=interval, add_excess=add_excess + message=message, repeats=repeats, interval=interval, add_excess=add_excess, + port=port ) - reader = LspStreamReader(stream=process.stdout, queue=queue) + stream = None + if port is None: + stream = process.stdout + else: + # give the server some time to start + await anyio.sleep(1) + stream = await anyio.connect_tcp("localhost", port) + + reader = LspStreamReader(stream=stream, queue=queue) async with anyio.create_task_group() as tg: tg.start_soon(join_process, process, 3, 1) tg.start_soon(reader.read) + if port is not None: + await stream.aclose() + result = queue.get_nowait() assert result == message * repeats From aa48b4f3274df921c44259f836020859313e548e Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 16 Jul 2021 18:32:44 +0200 Subject: [PATCH 18/51] Issue debug message if stream was closed prematurely --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 19f27c5ab..a2f01b859 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -110,8 +110,9 @@ async def read(self) -> None: self.wake() IOLoop.current().add_callback(self.queue.put_nowait, message) - except (anyio.ClosedResourceError, anyio.EndOfStream): + except anyio.ClosedResourceError: # stream was closed -> terminate + self.log.debug("Stream closed while a read was still in progress") break except Exception as e: # pragma: no cover self.log.exception( @@ -239,8 +240,9 @@ async def write(self) -> None: anyio.BrokenResourceError, ): # pragma: no cover # stream was closed -> terminate + self.log.debug("Stream closed while a write was still in progress") break - except Exception: + except Exception: # pragma: no cover self.log.exception("%s couldn't write message: %s", self, response) finally: self.queue.task_done() From 0f97a200062627ff8aa35847ab5672ad2fca3510 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 16 Jul 2021 18:35:39 +0200 Subject: [PATCH 19/51] Codestyle fixes --- .../jupyter_lsp/tests/test_reader.py | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py index 79ce67f83..583ec5be2 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py @@ -81,7 +81,7 @@ async def spawn_writer( interval=interval or 0, message=message, add_excess=add_excess, - port=port + port=port, ) ) return await anyio.open_process( @@ -112,25 +112,30 @@ async def join_process(process: anyio.abc.Process, headstart=1, timeout=1): ["ab" * 100_000, 1, None, False], ["ab", 2, 0.01, False], ["ab", 45, 0.01, False], - ["message", 2, 0.01, True] + ["message", 2, 0.01, True], + ], + ids=[ + "short", + "long", + "intermittent", + "intensive-intermittent", + "with-excess", ], - ids=["short", - "long", - "intermittent", - "intensive-intermittent", - "with-excess", - ], ) @pytest.mark.parametrize("mode", ["stdio", "tcp"], ids=["stdio", "tcp"]) @pytest.mark.anyio -async def test_reader(message, repeats, interval, add_excess, mode, - communicator_spawner): +async def test_reader( + message, repeats, interval, add_excess, mode, communicator_spawner +): queue = Queue() port = get_unused_port() if mode == "tcp" else None process = await communicator_spawner.spawn_writer( - message=message, repeats=repeats, interval=interval, add_excess=add_excess, - port=port + message=message, + repeats=repeats, + interval=interval, + add_excess=add_excess, + port=port, ) stream = None if port is None: From f3a02b349365b887b92fa439b4418ecdfb090fcd Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 16 Jul 2021 19:40:29 +0200 Subject: [PATCH 20/51] Fix type of streams in Reader's and Writer's c-tors --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index a2f01b859..85f4d930e 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -70,7 +70,7 @@ class LspStreamReader(LspStreamBase): BufferedByteReceiveStream, help="the stream to read from" ) # type: BufferedByteReceiveStream - def __init__(self, stream: anyio.abc.AsyncResource, **kwargs): + def __init__(self, stream: anyio.abc.ByteReceiveStream, **kwargs): super().__init__(**kwargs) self.stream = BufferedByteReceiveStream(stream) @@ -219,7 +219,7 @@ class LspStreamWriter(LspStreamBase): TextSendStream, help="the stream to write to" ) # type: TextSendStream - def __init__(self, stream: anyio.abc.AsyncResource, **kwargs): + def __init__(self, stream: anyio.abc.ByteSendStream, **kwargs): super().__init__(**kwargs) self.stream = TextSendStream(stream, encoding="utf-8") From bb3287e116c144a86ad8d06f030f7462ba000e29 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 16 Jul 2021 19:42:18 +0200 Subject: [PATCH 21/51] Add instructions for specifying port in language servers argv --- CONTRIBUTING.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 15ca28e44..59cdc14ce 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -390,6 +390,7 @@ otherwise an empty dictionary (`{}`) should be returned. - the mode used by `jupyter_lsp` to connect to the language server can be specified by including `mode="stdio"` or `mode="tcp"` in the language server `spec`-dictionary - currently it is not possible to connect to externally running language servers via tcp, but only to servers spawned by `jupyter_lsp` as given by the `argv` specs entry - PRs welcome to support externally running language servers! + - use the placeholder `{port}` within the `argv` entry to allow `jupyter_lsp` to specify the port on which to launch the language server - because of its VSCode heritage, many language servers use `nodejs` - `LanguageServerManager.nodejs` will provide the location of our best guess at where a user's `nodejs` might be found From 0b431a03f615b501ec03a827b77a6419b4fcbef0 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 17 Jul 2021 11:44:13 +0200 Subject: [PATCH 22/51] Remove no longer required ThreadPoolExecutor from Stream classes --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 85f4d930e..7a61aebb8 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -9,7 +9,6 @@ """ import os from abc import ABC, ABCMeta, abstractmethod -from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Text # pylint: disable=broad-except @@ -34,8 +33,6 @@ class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta): streams """ - executor = None - queue = Instance(Queue, help="queue to get/put") def __repr__(self): # pragma: no cover @@ -44,7 +41,6 @@ def __repr__(self): # pragma: no cover def __init__(self, **kwargs): super().__init__(**kwargs) self.log.debug("%s initialized", self) - self.executor = ThreadPoolExecutor(max_workers=1) @abstractmethod async def close(self): From 129ad3d6d98fda3b08f1aca90e52c6f4e929d90d Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 17 Jul 2021 11:46:42 +0200 Subject: [PATCH 23/51] Increase sleep before connecting in test to ensure that the tcp server is ready --- python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py index 583ec5be2..ccfd5e15f 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py @@ -142,7 +142,7 @@ async def test_reader( stream = process.stdout else: # give the server some time to start - await anyio.sleep(1) + await anyio.sleep(2) stream = await anyio.connect_tcp("localhost", port) reader = LspStreamReader(stream=stream, queue=queue) From 86cbad8bd9bbfc69b219c59c97d0c9f53157ed31 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Mon, 19 Jul 2021 16:53:03 +0200 Subject: [PATCH 24/51] Use newly introduced `env` parameter in `anyio.open_process` --- .../jupyter_lsp/jupyter_lsp/session.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index c62438717..fea037674 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -7,6 +7,7 @@ import threading from abc import ABC, ABCMeta, abstractmethod from concurrent.futures import ThreadPoolExecutor +from copy import copy from datetime import datetime, timezone from typing import List, Optional, cast @@ -157,12 +158,11 @@ async def run_portal(): async def start_process(self, argv: List[str]): """start the language server subprocess giben in argv""" - - self.substitute_env(self.spec.get("env", {}), os.environ) - - # and start the process self.process = await anyio.open_process( - argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE + argv, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + env=self.substitute_env(self.spec.get("env", {}), os.environ), ) async def stop_process(self, timeout: int = 5): @@ -195,8 +195,12 @@ def init_queues(self): self.to_lsp = Queue() def substitute_env(self, env, base): + final_env = copy(os.environ) + for key, value in env.items(): - os.environ.update({key: string.Template(value).safe_substitute(base)}) + final_env.update({key: string.Template(value).safe_substitute(base)}) + + return final_env @abstractmethod async def init_process(self): From 8f1889341f074561543b43ef8dcbe7a2640555ea Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Thu, 22 Jul 2021 19:02:00 +0200 Subject: [PATCH 25/51] Mark abstract methods with 'no cover' --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 2 +- python_packages/jupyter_lsp/jupyter_lsp/session.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 7a61aebb8..e1fef4ea1 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -44,7 +44,7 @@ def __init__(self, **kwargs): @abstractmethod async def close(self): - pass + pass # pragma: no cover class LspStreamReader(LspStreamBase): diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index fea037674..05bf06963 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -205,21 +205,21 @@ def substitute_env(self, env, base): @abstractmethod async def init_process(self): """start the language server subprocess and store it in self.process""" - pass + pass # pragma: no cover @abstractmethod def init_reader(self): """create the stream reader (from the language server) and store it in self.reader """ - pass + pass # pragma: no cover @abstractmethod def init_writer(self): """create the stream writer (to the language server) and store it in self.writer """ - pass + pass # pragma: no cover @run_on_executor def listen(self): From 83d28d129958cfe9ec64308bfeed22b8fa0ed3f3 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Thu, 22 Jul 2021 19:03:51 +0200 Subject: [PATCH 26/51] Add specs for pyls over tcp and include it into unit testing --- python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py | 3 ++- .../jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py | 8 ++++++++ python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py | 1 + python_packages/jupyter_lsp/setup.cfg | 1 + 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py b/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py index 9081d342a..c85e61a44 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py @@ -9,7 +9,7 @@ from .julia_language_server import JuliaLanguageServer from .pyls import PalantirPythonLanguageServer from .pyright import PyrightLanguageServer -from .python_lsp_server import PythonLSPServer +from .python_lsp_server import PythonLSPServer, PythonLSPServerTCP from .r_languageserver import RLanguageServer from .sql_language_server import SQLLanguageServer from .texlab import Texlab @@ -29,6 +29,7 @@ md = UnifiedLanguageServer() py_palantir = PalantirPythonLanguageServer() py_lsp_server = PythonLSPServer() +py_lsp_server_tcp = PythonLSPServerTCP() pyright = PyrightLanguageServer() r = RLanguageServer() tex = Texlab() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py b/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py index 799222b2d..6dc266f67 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py @@ -47,3 +47,11 @@ class PythonLSPServer(PythonModuleSpec): config_schema=load_config_schema(key), env=dict(PYTHONUNBUFFERED="1"), ) + + +class PythonLSPServerTCP(PythonLSPServer): + key = "pylsp-tcp" + args = ["--tcp", "--port", "{port}"] + spec = PythonLSPServer.spec.copy() + spec["display_name"] = "python-lsp-server (pylsp) over tcp" + spec["mode"] = "tcp" diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py index aa52f36b4..b4a121e36 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py @@ -20,6 +20,7 @@ "dockerfile-language-server-nodejs", "javascript-typescript-langserver", "pylsp", + "pylsp-tcp", "unified-language-server", "sql-language-server", "vscode-css-languageserver-bin", diff --git a/python_packages/jupyter_lsp/setup.cfg b/python_packages/jupyter_lsp/setup.cfg index 03851b196..a77c90a65 100644 --- a/python_packages/jupyter_lsp/setup.cfg +++ b/python_packages/jupyter_lsp/setup.cfg @@ -41,6 +41,7 @@ jupyter_lsp_spec_v1 = julia-language-server = jupyter_lsp.specs:julia python-language-server = jupyter_lsp.specs:py_palantir python-lsp-server = jupyter_lsp.specs:py_lsp_server + python-lsp-server-tcp = jupyter_lsp.specs:py_lsp_server_tcp pyright = jupyter_lsp.specs:pyright r-languageserver = jupyter_lsp.specs:r texlab = jupyter_lsp.specs:tex From 78c2f5c1c48bdabc0edeef8277ed4e4ec42b3cc5 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 24 Jul 2021 10:57:03 +0200 Subject: [PATCH 27/51] Add unit test checking that the LS process is brought down no matter what --- .../jupyter_lsp/jupyter_lsp/session.py | 9 ++++- .../jupyter_lsp/tests/test_session.py | 37 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 05bf06963..52fc4bdee 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -16,7 +16,7 @@ from tornado.ioloop import IOLoop from tornado.queues import Queue from tornado.websocket import WebSocketHandler -from traitlets import Bunch, Instance, Set, Unicode, UseEnum, observe +from traitlets import Bunch, Float, Instance, Set, Unicode, UseEnum, observe from traitlets.config import LoggingConfigurable from traitlets.traitlets import MetaHasTraits @@ -64,6 +64,11 @@ class LanguageServerSessionBase( last_handler_message_at = Instance(datetime, allow_none=True) last_server_message_at = Instance(datetime, allow_none=True) + stop_timeout = Float( + 5, + help="timeout after which a process will be terminated forcefully", + ).tag(config=True) + _skip_serialize = ["argv", "debug_argv"] def __init__(self, *args, **kwargs): @@ -120,7 +125,7 @@ def stop(self): self.portal.call(self.writer.close) self.writer = None if self.process: - self.portal.call(self.stop_process, 5) + self.portal.call(self.stop_process, self.stop_timeout) self.process = None self.status = SessionStatus.STOPPED diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index 7248d3258..a7981c2ff 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -1,4 +1,5 @@ import asyncio +import os import pytest @@ -100,3 +101,39 @@ async def test_ping(handlers): assert ws_handler._ping_sent is True ws_handler.on_close() + + +def exists_process_with_pid(pid): + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "timeout", [0, 5], ids=["terminate immediately", "after 5 seconds"] +) +async def test_stop(handlers, timeout): + """Test whether process will stop gracefully or forcefully""" + a_server = "pylsp" + + handler, ws_handler = handlers + manager = handler.manager + + manager.initialize() + + ws_handler.open(a_server) + + session = manager.sessions[ws_handler.language_server] + session.stop_timeout = timeout + + process_pid = session.process.pid + assert exists_process_with_pid(process_pid) is True + + ws_handler.on_close() + + await asyncio.sleep(timeout + 1) + assert exists_process_with_pid(process_pid) is False From c2b951d071b89126311b151147f91787c452123a Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 24 Jul 2021 11:46:34 +0200 Subject: [PATCH 28/51] Test that unknown modes in spec are detected --- .../jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py index d6be3d03e..09ab2c319 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py @@ -1,6 +1,7 @@ import pytest import traitlets +from jupyter_lsp.schema import SPEC_VERSION from jupyter_lsp.session import LanguageServerSessionStdio @@ -12,6 +13,12 @@ {"languages": None}, {"languages": 1}, {"languages": [1, "two"]}, + { + "argv": ["command"], + "languages": ["some language"], + "version": SPEC_VERSION, + "mode": "unknown", + }, ], ) def test_bad_spec(spec): From e2cc7c530bca320a2774a991b0e956964a40f498 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 24 Jul 2021 11:47:47 +0200 Subject: [PATCH 29/51] Mark code parts `no cover` that cannot be tested easily --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 4 ++-- python_packages/jupyter_lsp/jupyter_lsp/manager.py | 2 +- python_packages/jupyter_lsp/jupyter_lsp/session.py | 10 ++++++---- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index e1fef4ea1..cdd68fc6e 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -144,7 +144,7 @@ async def _read_content( part = await self.stream.receive_exactly(length - received_size) except anyio.IncompleteRead: # pragma: no cover pass - if part is None: + if part is None: # pragma: no cover max_empties -= 1 await self.sleep() continue @@ -201,7 +201,7 @@ async def _readline(self) -> Text: # resource has been closed before the requested bytes could be retrieved # -> signal recource closed raise anyio.ClosedResourceError - except anyio.DelimiterNotFound: + except anyio.DelimiterNotFound: # pragma: no cover self.log.error( "Readline hit max_bytes before newline character was encountered" ) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/manager.py b/python_packages/jupyter_lsp/jupyter_lsp/manager.py index 3ac8f2f60..a7eebf97b 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/manager.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/manager.py @@ -150,7 +150,7 @@ def init_sessions(self): sessions[language_server] = LanguageServerSessionTCP( language_server=language_server, spec=spec, parent=self ) - else: + else: # pragma: no cover raise ValueError( "Unknown session mode {} for language server '{}'".format( mode, language_server diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 52fc4bdee..d6214f0a4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -176,7 +176,7 @@ async def stop_process(self, timeout: int = 5): If the process does not terminate within timeout seconds it will be killed forcefully. """ - if self.process is None: + if self.process is None: # pragma: no cover return # try to stop the process gracefully @@ -237,7 +237,7 @@ async def _listen(self): await tg.spawn(self._read_lsp) await tg.spawn(self._write_lsp) await tg.spawn(self._broadcast_from_lsp) - except Exception as e: + except Exception as e: # pragma: no cover self.log.exception("Execption while listening {}", e) async def _read_lsp(self): @@ -317,13 +317,15 @@ async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): ).format(server, tries, retries, sleep) ) await anyio.sleep(sleep) - else: + else: # pragma: no cover self.log.warning( "Connection to server {} refused! Attempt {}/{}.".format( server, tries, retries ) ) - raise OSError("Unable to connect to server {}".format(server)) + raise OSError( + "Unable to connect to server {}".format(server) + ) # pragma: no cover def init_reader(self): self.reader = LspStreamReader( From d29e2ca40d14324e776ac52b1f0626b2aa840bee Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 24 Jul 2021 13:57:31 +0200 Subject: [PATCH 30/51] Remove no longer required code to make file non-blocking --- .../jupyter_lsp/jupyter_lsp/non_blocking.py | 45 ------------------- 1 file changed, 45 deletions(-) delete mode 100644 python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py diff --git a/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py b/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py deleted file mode 100644 index ef6f9de71..000000000 --- a/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Derived from - -> https://github.com/rudolfwalter/pygdbmi/blob/0.7.4.2/pygdbmi/gdbcontroller.py -> MIT License https://github.com/rudolfwalter/pygdbmi/blob/master/LICENSE -> Copyright (c) 2016 Chad Smith gmail.com> -""" -import os - -if os.name == "nt": # pragma: no cover - import msvcrt - from ctypes import POINTER, WinError, byref, windll, wintypes # type: ignore - from ctypes.wintypes import BOOL, DWORD, HANDLE # type: ignore -else: # pragma: no cover - import fcntl - - -def make_non_blocking(file_obj): # pragma: no cover - """ - make file object non-blocking - - Windows doesn't have the fcntl module, but someone on - stack overflow supplied this code as an answer, and it works - http://stackoverflow.com/a/34504971/2893090 - """ - - if os.name == "nt": - LPDWORD = POINTER(DWORD) - PIPE_NOWAIT = wintypes.DWORD(0x00000001) - - SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState - SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD] - SetNamedPipeHandleState.restype = BOOL - - h = msvcrt.get_osfhandle(file_obj.fileno()) - - res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None) - if res == 0: - raise ValueError(WinError()) - - else: - # Set the file status flag (F_SETFL) on the pipes to be non-blocking - # so we can attempt to read from a pipe with no new data without locking - # the program up - fcntl.fcntl(file_obj, fcntl.F_SETFL, os.O_NONBLOCK) From 77436799c7bcd2b62019c9721abb0c64b65affc9 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 24 Jul 2021 17:17:34 +0200 Subject: [PATCH 31/51] Move from `localhost` to `127.0.0.1` --- atest/ports.py | 2 +- python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py | 2 +- python_packages/jupyter_lsp/jupyter_lsp/utils.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/atest/ports.py b/atest/ports.py index 439227b6d..dac9307ed 100644 --- a/atest/ports.py +++ b/atest/ports.py @@ -9,7 +9,7 @@ def get_unused_port(): Probably could introduce race conditions if inside a tight loop. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("localhost", 0)) + sock.bind(("127.0.0.1", 0)) sock.listen(1) port = sock.getsockname()[1] sock.close() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py index ccfd5e15f..78d4a1dbf 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py @@ -143,7 +143,7 @@ async def test_reader( else: # give the server some time to start await anyio.sleep(2) - stream = await anyio.connect_tcp("localhost", port) + stream = await anyio.connect_tcp("127.0.0.1", port) reader = LspStreamReader(stream=stream, queue=queue) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/utils.py b/python_packages/jupyter_lsp/jupyter_lsp/utils.py index 439227b6d..dac9307ed 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/utils.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/utils.py @@ -9,7 +9,7 @@ def get_unused_port(): Probably could introduce race conditions if inside a tight loop. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("localhost", 0)) + sock.bind(("127.0.0.1", 0)) sock.listen(1) port = sock.getsockname()[1] sock.close() From c9125eb91988f98e7955aa8f4d56de8ff25e7467 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 31 Jul 2021 18:03:00 +0200 Subject: [PATCH 32/51] Rewrite session handling with anyio without need for blocking portal --- .../jupyter_lsp/jupyter_lsp/session.py | 133 ++++++++++-------- 1 file changed, 74 insertions(+), 59 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index d6214f0a4..eed2be11e 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -4,15 +4,15 @@ import os import string import subprocess -import threading from abc import ABC, ABCMeta, abstractmethod -from concurrent.futures import ThreadPoolExecutor from copy import copy from datetime import datetime, timezone -from typing import List, Optional, cast +from threading import Event, Thread +from typing import List import anyio -from tornado.concurrent import run_on_executor +from anyio import CancelScope +from anyio.abc import Process, SocketStream from tornado.ioloop import IOLoop from tornado.queues import Queue from tornado.websocket import WebSocketHandler @@ -42,11 +42,21 @@ class LanguageServerSessionBase( # run-time specifics process = Instance( - anyio.abc.Process, help="the language server subprocess", allow_none=True + Process, help="the language server subprocess", allow_none=True + ) + cancelscope = Instance( + CancelScope, help="scope used for stopping the session", allow_none=True) + started = Instance( + Event, + args=(), + help="event signaling that the session has finished starting", + allow_none=False + ) + thread = Instance( + Thread, + help="worker thread for running an event loop", + allow_none=True ) - executor = None - portal = None - cancelscope = None writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) from_lsp = Instance( @@ -75,8 +85,6 @@ def __init__(self, *args, **kwargs): """set up the required traitlets and exit behavior for a session""" super().__init__(*args, **kwargs) atexit.register(self.stop) - self.executor = ThreadPoolExecutor(max_workers=1) - self.start_blocking_portal() def __repr__(self): # pragma: no cover return ( @@ -96,36 +104,72 @@ def to_json(self): spec=censored_spec(self.spec), ) - def initialize(self): - """(re)initialize a language server session""" - self.stop() + def start(self): + """run a language server session asynchronously inside a worker thread + + will return as soon as the session is ready for communication + """ + self.started.clear() + self.thread = Thread(target=anyio.run, kwargs={"func": self.run}) + self.thread.start() + self.started.wait() + + def stop(self): + """shut down the session""" + if self.cancelscope is not None: + self.cancelscope.cancel() + self.cancelscope = None + + # wait for the session to get cleaned up + if self.thread and self.thread.is_alive(): + self.thread.join() + + async def run(self): + """run this session in a cancel scope and clean everything up on cancellation + + the event `self.started` will be set when everything is set up and the session + will be ready for communication + """ + async with CancelScope() as scope: + self.cancelscope = scope + await self.initialize() + self.started.set() + await self.listen() + await self.cleanup() + + async def initialize(self): + """initialize a language server session""" self.status = SessionStatus.STARTING + self.init_queues() - self.portal.call(self.init_process) + await self.init_process() self.init_writer() self.init_reader() - # start listening on the executor in a different event loop - self.listen() - self.status = SessionStatus.STARTED - def stop(self): - """clean up all of the state of the session""" + async def listen(self): + """start the actual read/write tasks""" + try: + async with anyio.create_task_group() as tg: + await tg.spawn(self._read_lsp) + await tg.spawn(self._write_lsp) + await tg.spawn(self._broadcast_from_lsp) + except Exception as e: # pragma: no cover + self.log.exception("Execption while listening {}", e) + async def cleanup(self): + """clean up all of the state of the session""" self.status = SessionStatus.STOPPING - if self.cancelscope is not None: - self.portal.call(self.cancelscope.cancel) - self.cancelscope = None - if self.reader: - self.portal.call(self.reader.close) + if self.reader is not None: + await self.reader.close() self.reader = None - if self.writer: - self.portal.call(self.writer.close) + if self.writer is not None: + await self.writer.close() self.writer = None - if self.process: - self.portal.call(self.stop_process, self.stop_timeout) + if self.process is not None: + await self.stop_process(self.stop_timeout) self.process = None self.status = SessionStatus.STOPPED @@ -134,7 +178,7 @@ def stop(self): def _on_handlers(self, change: Bunch): """re-initialize if someone starts listening, or stop if nobody is""" if change["new"] and not self.process: - self.initialize() + self.start() elif not change["new"] and self.process: self.stop() @@ -146,21 +190,6 @@ def write(self, message): def now(self): return datetime.now(timezone.utc) - # old definition of start_blocking_portal() prior to anyio3 - def start_blocking_portal(self): - async def run_portal(): - nonlocal portal - async with anyio.create_blocking_portal() as portal: - event.set() - await portal.sleep_until_stopped() - - portal: Optional[anyio.abc.BlockingPortal] - event = threading.Event() - thread = threading.Thread(target=anyio.run, kwargs={"func": run_portal}) - thread.start() - event.wait() - self.portal = cast(anyio.abc.BlockingPortal, portal) - async def start_process(self, argv: List[str]): """start the language server subprocess giben in argv""" self.process = await anyio.open_process( @@ -226,20 +255,6 @@ def init_writer(self): """ pass # pragma: no cover - @run_on_executor - def listen(self): - self.portal.call(self._listen) - - async def _listen(self): - try: - async with anyio.create_task_group() as tg: - self.cancelscope = tg.cancel_scope - await tg.spawn(self._read_lsp) - await tg.spawn(self._write_lsp) - await tg.spawn(self._broadcast_from_lsp) - except Exception as e: # pragma: no cover - self.log.exception("Execption while listening {}", e) - async def _read_lsp(self): await self.reader.read() @@ -274,7 +289,7 @@ def init_writer(self): class LanguageServerSessionTCP(LanguageServerSessionBase): tcp_con = Instance( - anyio.abc.SocketStream, help="the tcp connection", allow_none=True + SocketStream, help="the tcp connection", allow_none=True ) async def init_process(self): From d7f4f3f47f0eb76e89f28da44266b161d9414fc6 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 31 Jul 2021 18:09:16 +0200 Subject: [PATCH 33/51] Code style fixes --- .../jupyter_lsp/jupyter_lsp/session.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index eed2be11e..fc523205f 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -41,21 +41,18 @@ class LanguageServerSessionBase( spec = Schema(LANGUAGE_SERVER_SPEC) # run-time specifics - process = Instance( - Process, help="the language server subprocess", allow_none=True - ) + process = Instance(Process, help="the language server subprocess", allow_none=True) cancelscope = Instance( - CancelScope, help="scope used for stopping the session", allow_none=True) + CancelScope, help="scope used for stopping the session", allow_none=True + ) started = Instance( Event, args=(), help="event signaling that the session has finished starting", - allow_none=False + allow_none=False, ) thread = Instance( - Thread, - help="worker thread for running an event loop", - allow_none=True + Thread, help="worker thread for running an event loop", allow_none=True ) writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) @@ -107,7 +104,7 @@ def to_json(self): def start(self): """run a language server session asynchronously inside a worker thread - will return as soon as the session is ready for communication + will return as soon as the session is ready for communication """ self.started.clear() self.thread = Thread(target=anyio.run, kwargs={"func": self.run}) @@ -288,9 +285,7 @@ def init_writer(self): class LanguageServerSessionTCP(LanguageServerSessionBase): - tcp_con = Instance( - SocketStream, help="the tcp connection", allow_none=True - ) + tcp_con = Instance(SocketStream, help="the tcp connection", allow_none=True) async def init_process(self): """start the language server subprocess""" From a4a40c0d3119072b27c991aeab26e8a3b67d0481 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Tue, 3 Aug 2021 17:14:26 +0200 Subject: [PATCH 34/51] Add changelog entry --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9c3cd78b..019c62119 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## Changelog +### `@krassowski/jupyterlab-lsp 3.9.0` (unreleased) + +- improvements: + - add support for language servers that can (only) communicate through TCP rather than stdio (there is no support yet for servers already running on another machine and/or port) [(#636)] + +[#636]: https://github.com/krassowski/jupyterlab-lsp/pull/636 + ### `@krassowski/jupyterlab-lsp 3.8.1` (2021-08-02) - bug fixes: From cf8e92b3f51cd7271211a42e326d2630f97dfce7 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 23 Oct 2021 13:21:39 +0200 Subject: [PATCH 35/51] Remove unnecessary try catch --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index eea2ba415..ee3478da7 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -83,10 +83,7 @@ async def sleep(self): if self.stream._closed: # pragma: no cover return self.next_wait = min(self.next_wait * 2, self.max_wait) - try: - await anyio.sleep(self.next_wait) - except Exception: # pragma: no cover - pass + await anyio.sleep(self.next_wait) def wake(self): """Reset the wait time""" From b90fe56d935a0310f60ce205edb0cf17c439ce68 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 23 Oct 2021 13:24:14 +0200 Subject: [PATCH 36/51] Try increasing timeout for stop test to make it pass on the windows runner --- python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index 3f2cba36f..38c0949e5 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -135,5 +135,5 @@ async def test_stop(handlers, timeout): ws_handler.on_close() - await asyncio.sleep(timeout + 1) + await asyncio.sleep(timeout + 3) assert exists_process_with_pid(process_pid) is False From 2163484530ec742b959711339a8fe492f0239c95 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Wed, 3 Nov 2021 19:51:31 +0100 Subject: [PATCH 37/51] Handle language server process termination differently on Windows --- .../jupyter_lsp/jupyter_lsp/session.py | 30 ++++++++++++++----- .../jupyter_lsp/tests/test_session.py | 9 +++++- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index fc523205f..edcc516b3 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -4,6 +4,7 @@ import os import string import subprocess +import sys from abc import ABC, ABCMeta, abstractmethod from copy import copy from datetime import datetime, timezone @@ -212,13 +213,28 @@ async def stop_process(self, timeout: int = 5): await self.process.wait() return - self.log.debug( - ( - "Process did not terminate within {} seconds. " - "Bringing it down the hard way!" - ).format(timeout) - ) - self.process.kill() + if sys.platform.startswith('win32'): # pragma: no cover + # On Windows Process.kill() is an alias to Process.terminate so we cannot + # force the process to stop. if you know of a better way to handle this on + # Windows please consider contributing + self.log.warning( + ( + "The language server process (PID: {}) did not terminate within {} " + "seconds. Beware, it might continue running as a zombie process." + ).format(self.process.pid, timeout) + ) + else: # pragma: no cover + self.log.debug( + ( + "Process did not terminate within {} seconds. " + "Bringing it down the hard way!" + ).format(timeout) + ) + try: # pragma: no cover + self.process.kill() + except ProcessLookupError: + # process terminated on its own in the meantime + pass def init_queues(self): """create the queues""" diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index 38c0949e5..abeb9890e 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -2,6 +2,7 @@ import os import pytest +from sys import platform from ..schema import SERVERS_RESPONSE @@ -135,5 +136,11 @@ async def test_stop(handlers, timeout): ws_handler.on_close() - await asyncio.sleep(timeout + 3) + if platform.startswith('win32'): + # currently we cannot forcefully terminate the process on windows, so we just + # give it a little more extra time to finish on its own + await asyncio.sleep(timeout + 10) + else: # linux and darwin + await asyncio.sleep(timeout + 2) + assert exists_process_with_pid(process_pid) is False From fc1125b64febb5fabe6f6cd7ccfcc7d8abff27b0 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sat, 6 Nov 2021 19:51:01 +0100 Subject: [PATCH 38/51] Code style fixes --- python_packages/jupyter_lsp/jupyter_lsp/session.py | 2 +- python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index edcc516b3..c2a94e407 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -213,7 +213,7 @@ async def stop_process(self, timeout: int = 5): await self.process.wait() return - if sys.platform.startswith('win32'): # pragma: no cover + if sys.platform.startswith("win32"): # pragma: no cover # On Windows Process.kill() is an alias to Process.terminate so we cannot # force the process to stop. if you know of a better way to handle this on # Windows please consider contributing diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index abeb9890e..bdb9dfff5 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -1,8 +1,8 @@ import asyncio import os +from sys import platform import pytest -from sys import platform from ..schema import SERVERS_RESPONSE @@ -136,7 +136,7 @@ async def test_stop(handlers, timeout): ws_handler.on_close() - if platform.startswith('win32'): + if platform.startswith("win32"): # currently we cannot forcefully terminate the process on windows, so we just # give it a little more extra time to finish on its own await asyncio.sleep(timeout + 10) From e1660c7f3a8c8b86413c49b754600272c8410b53 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 7 Nov 2021 18:12:53 +0100 Subject: [PATCH 39/51] Fix coverage of test file itself --- python_packages/jupyter_lsp/jupyter_lsp/session.py | 2 +- .../jupyter_lsp/jupyter_lsp/tests/test_session.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index c2a94e407..8fff01a15 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -214,7 +214,7 @@ async def stop_process(self, timeout: int = 5): return if sys.platform.startswith("win32"): # pragma: no cover - # On Windows Process.kill() is an alias to Process.terminate so we cannot + # On Windows Process.kill() is an alias to Process.terminate() so we cannot # force the process to stop. if you know of a better way to handle this on # Windows please consider contributing self.log.warning( diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index bdb9dfff5..a4ab7fe8e 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -136,11 +136,12 @@ async def test_stop(handlers, timeout): ws_handler.on_close() - if platform.startswith("win32"): + if platform.startswith("win32"): # pragma: no cover # currently we cannot forcefully terminate the process on windows, so we just # give it a little more extra time to finish on its own await asyncio.sleep(timeout + 10) - else: # linux and darwin + else: # pragma: no cover + # linux and darwin await asyncio.sleep(timeout + 2) assert exists_process_with_pid(process_pid) is False From 98d110919cbf718851216decc5d1d4c8684df703 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 14 Nov 2021 18:27:42 +0100 Subject: [PATCH 40/51] Fix problem when using 0 seconds for stop timeout It seems as if this was caused by an update to one of the dependencies (maybe anyio?!). The CI pipeline did still pass for Linux, but raised some warning about the event loop already being closed on the worker thread. --- python_packages/jupyter_lsp/jupyter_lsp/session.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 8fff01a15..0c2a25a19 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -206,9 +206,12 @@ async def stop_process(self, timeout: int = 5): if self.process is None: # pragma: no cover return + if timeout < 0.0: # pragma: no cover + raise ValueError("Timeout must not be negative!") + # try to stop the process gracefully self.process.terminate() - with anyio.move_on_after(timeout): + with anyio.move_on_after(timeout + 0.1): # avoid timeout of 0s self.log.debug("Waiting for process to terminate") await self.process.wait() return From e9d5f5f90b6a540fd00a904c903202b08a3654d7 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Wed, 17 Nov 2021 11:59:32 +0100 Subject: [PATCH 41/51] Fix missing coverage if tcp connection is established on first try --- python_packages/jupyter_lsp/jupyter_lsp/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 0c2a25a19..4e373e07c 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -336,7 +336,7 @@ async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): tries = tries + 1 try: return await anyio.connect_tcp(host, port) - except OSError: + except OSError: # pragma: no cover if tries < retries: self.log.warning( ( @@ -346,7 +346,7 @@ async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): ).format(server, tries, retries, sleep) ) await anyio.sleep(sleep) - else: # pragma: no cover + else: self.log.warning( "Connection to server {} refused! Attempt {}/{}.".format( server, tries, retries From d41ce9ac7e1e1b6154c6075b387a37ac021455c3 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Wed, 17 Nov 2021 12:17:59 +0100 Subject: [PATCH 42/51] Removed probably unnecessary test for closed stream in sleep --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index ee3478da7..301242a03 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -80,8 +80,6 @@ def _default_max_wait(self): async def sleep(self): """Simple exponential backoff for sleeping""" - if self.stream._closed: # pragma: no cover - return self.next_wait = min(self.next_wait * 2, self.max_wait) await anyio.sleep(self.next_wait) From e83b7163abb068f070a6f41df78f655963ab9778 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Mon, 27 Jun 2022 17:31:00 +0200 Subject: [PATCH 43/51] Fix occasionally occurring race condition causing an exception --- python_packages/jupyter_lsp/jupyter_lsp/session.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 4e373e07c..4c1c38aa4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -55,6 +55,8 @@ class LanguageServerSessionBase( thread = Instance( Thread, help="worker thread for running an event loop", allow_none=True ) + main_loop = Instance( + IOLoop, help="the event loop of the main thread", allow_none=True) writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) from_lsp = Instance( @@ -107,6 +109,7 @@ def start(self): will return as soon as the session is ready for communication """ + self.main_loop = IOLoop.current() self.started.clear() self.thread = Thread(target=anyio.run, kwargs={"func": self.run}) self.thread.start() @@ -121,6 +124,7 @@ def stop(self): # wait for the session to get cleaned up if self.thread and self.thread.is_alive(): self.thread.join() + self.main_loop = None async def run(self): """run this session in a cancel scope and clean everything up on cancellation @@ -189,7 +193,7 @@ def now(self): return datetime.now(timezone.utc) async def start_process(self, argv: List[str]): - """start the language server subprocess giben in argv""" + """start the language server subprocess given in argv""" self.process = await anyio.open_process( argv, stdin=subprocess.PIPE, @@ -283,7 +287,8 @@ async def _broadcast_from_lsp(self): """ async for message in self.from_lsp: self.last_server_message_at = self.now() - await self.parent.on_server_message(message, self) + # handle message in the main thread's event loop + self.main_loop.add_callback(self.parent.on_server_message, message, self) self.from_lsp.task_done() From f492327e0380602bc5faf9bf8209706fc22da576 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Mon, 27 Jun 2022 17:58:43 +0200 Subject: [PATCH 44/51] Reapply mypy fixes --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 301242a03..508bc9cdf 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -62,7 +62,7 @@ class LspStreamReader(LspStreamBase): help="the maximum size a header line send by the language server may have", ).tag(config=True) - stream = Instance( + stream = Instance( # type:ignore[assignment] BufferedByteReceiveStream, help="the stream to read from" ) # type: BufferedByteReceiveStream @@ -206,7 +206,7 @@ async def _readline(self) -> Text: class LspStreamWriter(LspStreamBase): """Language Server Writer""" - stream = Instance( + stream = Instance( # type:ignore[assignment] TextSendStream, help="the stream to write to" ) # type: TextSendStream From 8444ec6d637d100b84926b57e29fd740740c25fe Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 3 Jul 2022 12:07:04 +0200 Subject: [PATCH 45/51] Remove old synchronous code from Reader and Writer --- .../jupyter_lsp/jupyter_lsp/connection.py | 106 +++--------------- .../jupyter_lsp/jupyter_lsp/session.py | 10 +- 2 files changed, 25 insertions(+), 91 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 508bc9cdf..86f435e08 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -7,19 +7,17 @@ > > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE > > Copyright 2018 Palantir Technologies, Inc. """ -import os from abc import ABC, ABCMeta, abstractmethod -from typing import List, Optional, Text +from typing import Optional, Text # pylint: disable=broad-except import anyio from anyio.streams.buffered import BufferedByteReceiveStream from anyio.streams.text import TextSendStream -from tornado.gen import convert_yielded from tornado.httputil import HTTPHeaders from tornado.ioloop import IOLoop from tornado.queues import Queue -from traitlets import Float, Instance, Int, default +from traitlets import Instance, Int from traitlets.config import LoggingConfigurable from traitlets.traitlets import MetaHasTraits @@ -48,15 +46,8 @@ async def close(self): class LspStreamReader(LspStreamBase): - """Language Server Reader + """Language Server Reader""" - Because non-blocking (but still synchronous) IO is used, rudimentary - exponential backoff is used. - """ - - max_wait = Float(help="maximum time to wait on idle stream").tag(config=True) - min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True) - next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True) receive_max_bytes = Int( 65536, help="the maximum size a header line send by the language server may have", @@ -74,32 +65,12 @@ async def close(self): await self.stream.aclose() self.log.debug("%s closed", self) - @default("max_wait") - def _default_max_wait(self): - return 0.1 if os.name == "nt" else self.min_wait * 2 - - async def sleep(self): - """Simple exponential backoff for sleeping""" - self.next_wait = min(self.next_wait * 2, self.max_wait) - await anyio.sleep(self.next_wait) - - def wake(self): - """Reset the wait time""" - self.wait = self.min_wait - async def read(self) -> None: """Read from a Language Server until it is closed""" while True: message = None try: message = await self.read_one() - - if not message: - await self.sleep() - continue - else: - self.wake() - IOLoop.current().add_callback(self.queue.put_nowait, message) except anyio.ClosedResourceError: # stream was closed -> terminate @@ -109,89 +80,48 @@ async def read(self) -> None: self.log.exception( "%s couldn't enqueue message: %s (%s)", self, message, e ) - await self.sleep() - - async def _read_content( - self, length: int, max_parts=1000, max_empties=200 - ) -> Optional[bytes]: - """Read the full length of the message unless exceeding max_parts or - max_empties empty reads occur. - See https://github.com/jupyter-lsp/jupyterlab-lsp/issues/450 - - Crucial docs or read(): - "If the argument is positive, and the underlying raw - stream is not interactive, multiple raw reads may be issued - to satisfy the byte count (unless EOF is reached first)" + async def _read_content(self, length: int) -> Optional[bytes]: + """Read the full length of the message. Args: - length: the content length - - max_parts: prevent absurdly long messages (1000 parts is several MBs): - 1 part is usually sufficient but not enough for some long - messages 2 or 3 parts are often needed. """ - raw = None - raw_parts: List[bytes] = [] - received_size = 0 - while received_size < length and len(raw_parts) < max_parts and max_empties > 0: - part = None - try: - part = await self.stream.receive_exactly(length - received_size) - except anyio.IncompleteRead: # pragma: no cover - pass - if part is None: # pragma: no cover - max_empties -= 1 - await self.sleep() - continue - received_size += len(part) - raw_parts.append(part) - - if raw_parts: - raw = b"".join(raw_parts) - if len(raw) != length: # pragma: no cover - self.log.warning( - f"Readout and content-length mismatch: {len(raw)} vs {length};" - f"remaining empties: {max_empties}; remaining parts: {max_parts}" - ) - - return raw + try: + return await self.stream.receive_exactly(length) + except anyio.IncompleteRead: # pragma: no cover + # resource has been closed before the requested bytes could be retrieved + # -> signal recource closed + raise anyio.ClosedResourceError async def read_one(self) -> Text: """Read a single message""" message = "" headers = HTTPHeaders() - line = await convert_yielded(self._readline()) + line = await self._readline() if line: while line and line.strip(): headers.parse_line(line) - line = await convert_yielded(self._readline()) + line = await self._readline() content_length = int(headers.get("content-length", "0")) if content_length: raw = await self._read_content(length=content_length) - if raw is not None: - message = raw.decode("utf-8").strip() - else: # pragma: no cover - self.log.warning( - "%s failed to read message of length %s", - self, - content_length, - ) + message = raw.decode("utf-8").strip() return message async def _readline(self) -> Text: - """Read a line (or immediately return None)""" + """Read a line""" try: # use same max_bytes as is default for receive for now. It seems there is no # way of getting the bytes read until max_bytes is reached, so we cannot # iterate the receive_until call with smaller max_bytes values - async with anyio.move_on_after(0.2): - line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes) - return line.decode("utf-8").strip() + line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes) + return line.decode("utf-8").strip() except anyio.IncompleteRead: # resource has been closed before the requested bytes could be retrieved # -> signal recource closed @@ -225,7 +155,7 @@ async def write(self) -> None: try: n_bytes = len(message.encode("utf-8")) response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message) - await convert_yielded(self._write_one(response)) + await self._write_one(response) except ( anyio.ClosedResourceError, anyio.BrokenResourceError, diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 4c1c38aa4..5538a1ce6 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -57,6 +57,8 @@ class LanguageServerSessionBase( ) main_loop = Instance( IOLoop, help="the event loop of the main thread", allow_none=True) + thread_loop = Instance( + IOLoop, help="the event loop of the worker thread", allow_none=True) writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) from_lsp = Instance( @@ -118,8 +120,7 @@ def start(self): def stop(self): """shut down the session""" if self.cancelscope is not None: - self.cancelscope.cancel() - self.cancelscope = None + self.thread_loop.add_callback(self.cancelscope.cancel) # wait for the session to get cleaned up if self.thread and self.thread.is_alive(): @@ -132,12 +133,15 @@ async def run(self): the event `self.started` will be set when everything is set up and the session will be ready for communication """ + self.thread_loop = IOLoop.current() async with CancelScope() as scope: self.cancelscope = scope await self.initialize() self.started.set() await self.listen() await self.cleanup() + self.cancelscope = None + self.thread_loop = None async def initialize(self): """initialize a language server session""" @@ -187,7 +191,7 @@ def _on_handlers(self, change: Bunch): def write(self, message): """wrapper around the write queue to keep it mostly internal""" self.last_handler_message_at = self.now() - IOLoop.current().add_callback(self.to_lsp.put_nowait, message) + self.thread_loop.add_callback(self.to_lsp.put_nowait, message) def now(self): return datetime.now(timezone.utc) From 341f810ee6eaf725901ee729a045f43301afe1e4 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Sun, 3 Jul 2022 16:37:24 +0200 Subject: [PATCH 46/51] Remove extraneous cancel scope in Session --- .../jupyter_lsp/jupyter_lsp/session.py | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 5538a1ce6..2282a970d 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -128,20 +128,23 @@ def stop(self): self.main_loop = None async def run(self): - """run this session in a cancel scope and clean everything up on cancellation - - the event `self.started` will be set when everything is set up and the session - will be ready for communication - """ + """run this session in a task group and clean everything up on cancellation""" self.thread_loop = IOLoop.current() - async with CancelScope() as scope: - self.cancelscope = scope - await self.initialize() - self.started.set() - await self.listen() - await self.cleanup() - self.cancelscope = None - self.thread_loop = None + + try: + async with anyio.create_task_group() as tg: + self.cancelscope = tg.cancel_scope + await self.initialize() + self.started.set() + tg.start_soon(self._read_lsp) + tg.start_soon(self._write_lsp) + tg.start_soon(self._broadcast_from_lsp) + except Exception as e: # pragma: no cover + self.log.exception("Execption while listening {}", e) + finally: + await self.cleanup() + self.cancelscope = None + self.thread_loop = None async def initialize(self): """initialize a language server session""" @@ -154,16 +157,6 @@ async def initialize(self): self.status = SessionStatus.STARTED - async def listen(self): - """start the actual read/write tasks""" - try: - async with anyio.create_task_group() as tg: - await tg.spawn(self._read_lsp) - await tg.spawn(self._write_lsp) - await tg.spawn(self._broadcast_from_lsp) - except Exception as e: # pragma: no cover - self.log.exception("Execption while listening {}", e) - async def cleanup(self): """clean up all of the state of the session""" self.status = SessionStatus.STOPPING From 1994870cfdb619c91b2a78e82ae17ee52ffd2dee Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Mon, 4 Jul 2022 17:51:27 +0200 Subject: [PATCH 47/51] Switch from Tornado Queues to anyio MemoryObjectStreams --- .../jupyter_lsp/jupyter_lsp/connection.py | 11 +++---- .../jupyter_lsp/jupyter_lsp/session.py | 30 ++++++++++++++----- .../jupyter_lsp/tests/test_reader.py | 8 +++-- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 86f435e08..2882b285b 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -14,9 +14,8 @@ import anyio from anyio.streams.buffered import BufferedByteReceiveStream from anyio.streams.text import TextSendStream +from anyio.streams.stapled import StapledObjectStream from tornado.httputil import HTTPHeaders -from tornado.ioloop import IOLoop -from tornado.queues import Queue from traitlets import Instance, Int from traitlets.config import LoggingConfigurable from traitlets.traitlets import MetaHasTraits @@ -31,7 +30,7 @@ class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta): streams """ - queue = Instance(Queue, help="queue to get/put") + queue = Instance(StapledObjectStream, help="queue to get/put") def __repr__(self): # pragma: no cover return "<{}(parent={})>".format(self.__class__.__name__, self.parent) @@ -71,7 +70,7 @@ async def read(self) -> None: message = None try: message = await self.read_one() - IOLoop.current().add_callback(self.queue.put_nowait, message) + await self.queue.send(message) except anyio.ClosedResourceError: # stream was closed -> terminate self.log.debug("Stream closed while a read was still in progress") @@ -151,7 +150,7 @@ async def close(self): async def write(self) -> None: """Write to a Language Server until it closes""" while True: - message = await self.queue.get() + message = await self.queue.receive() try: n_bytes = len(message.encode("utf-8")) response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message) @@ -165,8 +164,6 @@ async def write(self) -> None: break except Exception: # pragma: no cover self.log.exception("%s couldn't write message: %s", self, response) - finally: - self.queue.task_done() async def _write_one(self, message) -> None: await self.stream.send(message) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 2282a970d..0043bf15c 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -1,6 +1,7 @@ """ A session for managing a language server process """ import atexit +import math import os import string import subprocess @@ -14,8 +15,8 @@ import anyio from anyio import CancelScope from anyio.abc import Process, SocketStream +from anyio.streams.stapled import StapledObjectStream from tornado.ioloop import IOLoop -from tornado.queues import Queue from tornado.websocket import WebSocketHandler from traitlets import Bunch, Float, Instance, Set, Unicode, UseEnum, observe from traitlets.config import LoggingConfigurable @@ -62,10 +63,14 @@ class LanguageServerSessionBase( writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) from_lsp = Instance( - Queue, help="a queue for string messages from the server", allow_none=True + StapledObjectStream, + help="a queue for string messages from the server", + allow_none=True ) to_lsp = Instance( - Queue, help="a queue for string message to the server", allow_none=True + StapledObjectStream, + help="a queue for string messages to the server", + allow_none=True ) handlers = Set( trait=Instance(WebSocketHandler), @@ -80,6 +85,10 @@ class LanguageServerSessionBase( 5, help="timeout after which a process will be terminated forcefully", ).tag(config=True) + queue_size = Float( + math.inf, + help="the maximum number of messages that can be buffered in the queue" + ).tag(config=True) _skip_serialize = ["argv", "debug_argv"] @@ -170,6 +179,12 @@ async def cleanup(self): if self.process is not None: await self.stop_process(self.stop_timeout) self.process = None + if self.from_lsp is not None: + await self.from_lsp.aclose() + self.from_lsp = None + if self.to_lsp is not None: + await self.to_lsp.aclose() + self.to_lsp = None self.status = SessionStatus.STOPPED @@ -184,7 +199,7 @@ def _on_handlers(self, change: Bunch): def write(self, message): """wrapper around the write queue to keep it mostly internal""" self.last_handler_message_at = self.now() - self.thread_loop.add_callback(self.to_lsp.put_nowait, message) + self.thread_loop.add_callback(self.to_lsp.send, message) def now(self): return datetime.now(timezone.utc) @@ -242,8 +257,10 @@ async def stop_process(self, timeout: int = 5): def init_queues(self): """create the queues""" - self.from_lsp = Queue() - self.to_lsp = Queue() + self.from_lsp = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=self.queue_size)) + self.to_lsp = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=self.queue_size)) def substitute_env(self, env, base): final_env = copy(os.environ) @@ -286,7 +303,6 @@ async def _broadcast_from_lsp(self): self.last_server_message_at = self.now() # handle message in the main thread's event loop self.main_loop.add_callback(self.parent.on_server_message, message, self) - self.from_lsp.task_done() class LanguageServerSessionStdio(LanguageServerSessionBase): diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py index 78d4a1dbf..89c8d4b48 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py @@ -1,8 +1,9 @@ import subprocess import anyio +from anyio.streams.stapled import StapledObjectStream +import math import pytest -from tornado.queues import Queue from jupyter_lsp.connection import LspStreamReader from jupyter_lsp.utils import get_unused_port @@ -127,7 +128,8 @@ async def join_process(process: anyio.abc.Process, headstart=1, timeout=1): async def test_reader( message, repeats, interval, add_excess, mode, communicator_spawner ): - queue = Queue() + queue = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=math.inf)) port = get_unused_port() if mode == "tcp" else None process = await communicator_spawner.spawn_writer( @@ -154,5 +156,5 @@ async def test_reader( if port is not None: await stream.aclose() - result = queue.get_nowait() + result = await queue.receive() assert result == message * repeats From 3e02246078d92e25e57e712328572ddbb508858c Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Mon, 4 Jul 2022 18:26:41 +0200 Subject: [PATCH 48/51] Fix mypy error caused by Optional return value --- python_packages/jupyter_lsp/jupyter_lsp/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py index 2882b285b..d4e796dc2 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/connection.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -8,7 +8,7 @@ > > Copyright 2018 Palantir Technologies, Inc. """ from abc import ABC, ABCMeta, abstractmethod -from typing import Optional, Text +from typing import Text # pylint: disable=broad-except import anyio @@ -80,7 +80,7 @@ async def read(self) -> None: "%s couldn't enqueue message: %s (%s)", self, message, e ) - async def _read_content(self, length: int) -> Optional[bytes]: + async def _read_content(self, length: int) -> bytes: """Read the full length of the message. Args: From efe22b4e37157768adbf82a06481c760aeb0c2d6 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 8 Jul 2022 17:13:12 +0200 Subject: [PATCH 49/51] Add units (seconds) to stop_timeout --- python_packages/jupyter_lsp/jupyter_lsp/session.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 0043bf15c..688d10a7a 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -81,9 +81,9 @@ class LanguageServerSessionBase( last_handler_message_at = Instance(datetime, allow_none=True) last_server_message_at = Instance(datetime, allow_none=True) - stop_timeout = Float( + stop_timeout_s = Float( 5, - help="timeout after which a process will be terminated forcefully", + help="timeout in seconds after which a process will be terminated forcefully", ).tag(config=True) queue_size = Float( math.inf, @@ -177,7 +177,7 @@ async def cleanup(self): await self.writer.close() self.writer = None if self.process is not None: - await self.stop_process(self.stop_timeout) + await self.stop_process(self.stop_timeout_s) self.process = None if self.from_lsp is not None: await self.from_lsp.aclose() From bdbc4ace0cf61c30373b67ad49be3e0ee97d17e0 Mon Sep 17 00:00:00 2001 From: FlyingSamson Date: Fri, 8 Jul 2022 17:19:21 +0200 Subject: [PATCH 50/51] Encode unbounded queue with size -1 --- python_packages/jupyter_lsp/jupyter_lsp/session.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 688d10a7a..902ca558c 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -86,8 +86,9 @@ class LanguageServerSessionBase( help="timeout in seconds after which a process will be terminated forcefully", ).tag(config=True) queue_size = Float( - math.inf, - help="the maximum number of messages that can be buffered in the queue" + -1, + help="the maximum number of messages that can be buffered in the queue or -1 " + "for an unbounded queue" ).tag(config=True) _skip_serialize = ["argv", "debug_argv"] @@ -257,10 +258,11 @@ async def stop_process(self, timeout: int = 5): def init_queues(self): """create the queues""" + queue_size = math.inf if self.queue_size < 0 else self.queue_size self.from_lsp = StapledObjectStream( - *anyio.create_memory_object_stream(max_buffer_size=self.queue_size)) + *anyio.create_memory_object_stream(max_buffer_size=queue_size)) self.to_lsp = StapledObjectStream( - *anyio.create_memory_object_stream(max_buffer_size=self.queue_size)) + *anyio.create_memory_object_stream(max_buffer_size=queue_size)) def substitute_env(self, env, base): final_env = copy(os.environ) From a0f6937babe89c21ad24c96267e7a52deb201532 Mon Sep 17 00:00:00 2001 From: krassowski <5832902+krassowski@users.noreply.github.com> Date: Sat, 31 Dec 2022 12:40:46 +0000 Subject: [PATCH 51/51] Add missing `await` in `test_stop` --- python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index ac25adee9..c73bcdea4 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -126,7 +126,7 @@ async def test_stop(handlers, timeout): manager.initialize() - ws_handler.open(a_server) + await ws_handler.open(a_server) session = manager.sessions[ws_handler.language_server] session.stop_timeout = timeout