Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tcp mode #636

Open
wants to merge 79 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
09c8c39
Move sessions to anyio
FlyingSamson Feb 21, 2021
4b3ab1b
Add support for connection to lsp providers through tcp
FlyingSamson Mar 17, 2021
0ca4164
Merge pull request #1 from krassowski/master
FlyingSamson Apr 9, 2021
5998969
Merge branch 'master' into tcp_mode
FlyingSamson Apr 9, 2021
e480fc8
Fix error where jupyter would not start after page reload
FlyingSamson May 23, 2021
22f5dbd
Merge pull request #2 from krassowski/master
FlyingSamson May 24, 2021
6dbdbac
Merge branch 'master' into tcp_mode
FlyingSamson May 24, 2021
ac7be83
Adapt to changes of start_blocking_portal() in anyio3.0
FlyingSamson Jun 5, 2021
9360802
Kill lsp-servers which are not terminating willingly when asked to
FlyingSamson Jun 20, 2021
b76b05e
Merge branch 'krassowski:master' into master
FlyingSamson Jun 28, 2021
030d50f
Merge branch 'master' into tcp_mode
FlyingSamson Jun 28, 2021
84afabd
Code style fixes
FlyingSamson Jul 1, 2021
5174669
Make it work for LSPs running in own process on localhost
FlyingSamson Jul 3, 2021
880f0f6
Split Session in separate classes for TCP and Stdio
FlyingSamson Jul 4, 2021
e36f3ac
Move stream from LspStreamBase to LspStreamReader and LspStreamWriter
FlyingSamson Jul 5, 2021
7085747
Fix unit tests (switched to anyio)
FlyingSamson Jul 8, 2021
22ae93f
Remove code related to externally running servers for now
FlyingSamson Jul 8, 2021
62dc992
Extend docs for extending language servers with different modes
FlyingSamson Jul 8, 2021
ce18086
Merge branch 'krassowski:master' into master
FlyingSamson Jul 8, 2021
08ebe54
Merge branch 'master' into tcp_mode
FlyingSamson Jul 8, 2021
e0ef174
Fix codestyle
FlyingSamson Jul 9, 2021
a5be8fb
Make maximum bytes for receive configurable
FlyingSamson Jul 9, 2021
aacc847
Fix spelling in doc
FlyingSamson Jul 9, 2021
792ac73
Enforce interfaces by making base classes for Session and Stream abst…
FlyingSamson Jul 11, 2021
f41d5fe
Add unit test for reading over tcp
FlyingSamson Jul 16, 2021
aa48b4f
Issue debug message if stream was closed prematurely
FlyingSamson Jul 16, 2021
0f97a20
Codestyle fixes
FlyingSamson Jul 16, 2021
f3a02b3
Fix type of streams in Reader's and Writer's c-tors
FlyingSamson Jul 16, 2021
bb3287e
Add instructions for specifying port in language servers argv
FlyingSamson Jul 16, 2021
0b431a0
Remove no longer required ThreadPoolExecutor from Stream classes
FlyingSamson Jul 17, 2021
129ad3d
Increase sleep before connecting in test to ensure that the tcp serve…
FlyingSamson Jul 17, 2021
86cbad8
Use newly introduced `env` parameter in `anyio.open_process`
FlyingSamson Jul 19, 2021
8f18893
Mark abstract methods with 'no cover'
FlyingSamson Jul 22, 2021
83d28d1
Add specs for pyls over tcp and include it into unit testing
FlyingSamson Jul 22, 2021
78c2f5c
Add unit test checking that the LS process is brought down no matter …
FlyingSamson Jul 24, 2021
c2b951d
Test that unknown modes in spec are detected
FlyingSamson Jul 24, 2021
e2cc7c5
Mark code parts `no cover` that cannot be tested easily
FlyingSamson Jul 24, 2021
d29e2ca
Remove no longer required code to make file non-blocking
FlyingSamson Jul 24, 2021
7743679
Move from `localhost` to `127.0.0.1`
FlyingSamson Jul 24, 2021
c9125eb
Rewrite session handling with anyio without need for blocking portal
FlyingSamson Jul 31, 2021
d7f4f3f
Code style fixes
FlyingSamson Jul 31, 2021
6b3c955
Merge branch 'krassowski:master' into master
FlyingSamson Aug 1, 2021
c5c2156
Merge branch 'master' into tcp_mode
FlyingSamson Aug 1, 2021
376264e
Merge branch 'krassowski:master' into master
FlyingSamson Aug 3, 2021
710bd2b
Merge branch 'master' into tcp_mode
FlyingSamson Aug 3, 2021
a4a40c0
Add changelog entry
FlyingSamson Aug 3, 2021
6a1cc56
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Oct 23, 2021
b948108
Merge branch 'master' into tcp_mode
FlyingSamson Oct 23, 2021
cf8e92b
Remove unnecessary try catch
FlyingSamson Oct 23, 2021
b90fe56
Try increasing timeout for stop test to make it pass on the windows r…
FlyingSamson Oct 23, 2021
9367ee9
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Oct 26, 2021
f2f2f80
Merge branch 'master' into tcp_mode
FlyingSamson Oct 26, 2021
2163484
Handle language server process termination differently on Windows
FlyingSamson Nov 3, 2021
fc1125b
Code style fixes
FlyingSamson Nov 6, 2021
e1660c7
Fix coverage of test file itself
FlyingSamson Nov 7, 2021
e5b7b73
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Nov 14, 2021
84e0c91
Merge branch 'master' into tcp_mode2
FlyingSamson Nov 14, 2021
98d1109
Fix problem when using 0 seconds for stop timeout
FlyingSamson Nov 14, 2021
e9d5f5f
Fix missing coverage if tcp connection is established on first try
FlyingSamson Nov 17, 2021
d41ce9a
Removed probably unnecessary test for closed stream in sleep
FlyingSamson Nov 17, 2021
9e61c2f
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Dec 1, 2021
562148a
Merge branch 'master' into tcp_mode
FlyingSamson Dec 1, 2021
148f868
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Apr 24, 2022
4f9b95b
Merge remote-tracking branch 'origin/master' into tcp_mode
FlyingSamson Apr 24, 2022
b1205fb
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Jun 5, 2022
032ea35
Merge branch 'master' into tcp_mode
FlyingSamson Jun 5, 2022
e83b716
Fix occasionally occurring race condition causing an exception
FlyingSamson Jun 27, 2022
3417de4
Merge branch 'jupyter-lsp:master' into master
FlyingSamson Jun 27, 2022
6a9ff59
Merge branch 'master' into tcp_mode
FlyingSamson Jun 27, 2022
f492327
Reapply mypy fixes
FlyingSamson Jun 27, 2022
8444ec6
Remove old synchronous code from Reader and Writer
FlyingSamson Jul 3, 2022
341f810
Remove extraneous cancel scope in Session
FlyingSamson Jul 3, 2022
1994870
Switch from Tornado Queues to anyio MemoryObjectStreams
FlyingSamson Jul 4, 2022
3e02246
Fix mypy error caused by Optional return value
FlyingSamson Jul 4, 2022
efe22b4
Add units (seconds) to stop_timeout
FlyingSamson Jul 8, 2022
bdbc4ac
Encode unbounded queue with size -1
FlyingSamson Jul 8, 2022
c019eb2
Merge branch 'master' into tcp_mode
krassowski Dec 28, 2022
3451099
Merge branch 'master' into HEAD
krassowski Dec 28, 2022
a0f6937
Add missing `await` in `test_stop`
krassowski Dec 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Language Server stdio-mode readers
""" Language Server readers and writers

Parts of this code are derived from:

Expand All @@ -8,7 +8,9 @@
> > Copyright 2018 Palantir Technologies, Inc.
"""
# pylint: disable=broad-except
import asyncio
import anyio
from anyio.streams.buffered import BufferedByteReceiveStream
from anyio.streams.text import TextSendStream
import io
import os
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -24,15 +26,14 @@

from .non_blocking import make_non_blocking


class LspStdIoBase(LoggingConfigurable):
"""Non-blocking, queued base for communicating with stdio Language Servers"""
class LspStreamBase(LoggingConfigurable):
"""Non-blocking, queued base for communicating with Language Servers through anyio streams"""

executor = None

stream = Instance(
io.RawIOBase, help="the stream to read/write"
) # type: io.RawIOBase
anyio.abc.AsyncResource, help="the stream to read/write"
) # type: anyio.abc.AsyncResource
queue = Instance(Queue, help="queue to get/put")

def __repr__(self): # pragma: no cover
Expand All @@ -43,13 +44,13 @@ def __init__(self, **kwargs):
self.log.debug("%s initialized", self)
self.executor = ThreadPoolExecutor(max_workers=1)

def close(self):
self.stream.close()
async def close(self):
await self.stream.aclose()
self.log.debug("%s closed", self)


class LspStdIoReader(LspStdIoBase):
"""Language Server stdio Reader
class LspStreamReader(LspStreamBase):
"""Language Server Reader

Because non-blocking (but still synchronous) IO is used, rudimentary
exponential backoff is used.
Expand All @@ -59,13 +60,17 @@ class LspStdIoReader(LspStdIoBase):
min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True)
next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True)

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.stream = BufferedByteReceiveStream(self.stream)

@default("max_wait")
def _default_max_wait(self):
return 0.1 if os.name == "nt" else self.min_wait * 2

async def sleep(self):
"""Simple exponential backoff for sleeping"""
if self.stream.closed: # pragma: no cover
if self.stream._closed: # pragma: no cover
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this side effect really necessary? I think we only ever call this method more or less immediately after a read operation was performed, at which time the stream was alive then. That is, hardly any time will have gone by between then and the time we entered this function which will as first action recheck that the stream is alive, which isn't even a concern in the remainder of that function.

Just asking because it seems rather complicated to produce future-proof code that will check wether the stream is alive. The used _closed property is undocumented and might go away in future versions of anyio. Another option would be a receive_exactly(0) within a try IncompleteRead except ..., however the border case of 0 bytes to read also seems risky, as receive_exaclty might or might not decide in future version whether 0 bytes can safely read by a closed steam (namely returning empty bytes-object) or not (namely throwing IncompleteRead

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why it is here. I imagine it could be useful if it was down where the sleep is called. It would be fine to remove it and wrap the entire sleep() call which is inside _read_content() in a try-except instead, just to be safe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But which exception would this catch? I mean sleep then only consists of computing the minimum of two floats and a call to anyio.sleep() which as far as I can tell should not throw, especially not as a consequence of the stream being closed.

In _read_content, after the call to sleep is done it will try to receive another part of the message, but that part already is guarded by a try ... catch block capturing the case that during the sleep the stream was closed.

return
self.next_wait = min(self.next_wait * 2, self.max_wait)
try:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need that try ... except ... part here? As far as I can see, neither the previous asyncio.sleep nor the new anyio.sleep throw.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it does not seem useful here.

Expand All @@ -79,9 +84,7 @@ def wake(self):

async def read(self) -> None:
"""Read from a Language Server until it is closed"""
make_non_blocking(self.stream)

while not self.stream.closed:
while True:
message = None
try:
message = await self.read_one()
Expand All @@ -93,6 +96,9 @@ async def read(self) -> None:
self.wake()

IOLoop.current().add_callback(self.queue.put_nowait, message)
except (anyio.ClosedResourceError, anyio.EndOfStream):
# stream was closed -> terminate
break
FlyingSamson marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e: # pragma: no cover
self.log.exception(
"%s couldn't enqueue message: %s (%s)", self, message, e
Expand Down Expand Up @@ -124,8 +130,8 @@ async def _read_content(
while received_size < length and len(raw_parts) < max_parts and max_empties > 0:
part = None
try:
part = self.stream.read(length - received_size)
except OSError: # pragma: no cover
part = await self.stream.receive_exactly(length - received_size)
except anyio.IncompleteRead: # pragma: no cover
pass
if part is None:
max_empties -= 1
Expand Down Expand Up @@ -171,32 +177,44 @@ async def read_one(self) -> Text:

return message

@run_on_executor
def _readline(self) -> Text:
async def _readline(self) -> Text:
"""Read a line (or immediately return None)"""
try:
return self.stream.readline().decode("utf-8").strip()
except OSError: # pragma: no cover
# use same max_bytes as is default for receive for now. It seems there is no way of getting
# the bytes read until max_bytes is reached, so we cannot iterate the receive_until call
# with smaller max_bytes values
async with anyio.move_on_after(0.2) as moa:
line = await self.stream.receive_until(b'\r\n', 65536)
FlyingSamson marked this conversation as resolved.
Show resolved Hide resolved
return line.decode("utf-8").strip()
except anyio.IncompleteRead:
# resource has been closed before the requested bytes could be retrieved -> signal recource closed
raise anyio.ClosedResourceError
except anyio.DelimiterNotFound:
self.log.error("Readline hit max_bytes before newline character was encountered")
return ""

class LspStreamWriter(LspStreamBase):
"""Language Server Writer"""

class LspStdIoWriter(LspStdIoBase):
"""Language Server stdio Writer"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.stream = TextSendStream(self.stream, encoding='utf-8')

async def write(self) -> None:
"""Write to a Language Server until it closes"""
while not self.stream.closed:
while True:
message = await self.queue.get()
try:
body = message.encode("utf-8")
response = "Content-Length: {}\r\n\r\n{}".format(len(body), message)
await convert_yielded(self._write_one(response.encode("utf-8")))
except Exception: # pragma: no cover
nBytes = len(message.encode("utf-8"))
FlyingSamson marked this conversation as resolved.
Show resolved Hide resolved
response = "Content-Length: {}\r\n\r\n{}".format(nBytes, message)
await convert_yielded(self._write_one(response))
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: no cover
# stream was closed -> terminate
break
except Exception:
self.log.exception("%s couldn't write message: %s", self, response)
finally:
self.queue.task_done()

@run_on_executor
def _write_one(self, message) -> None:
self.stream.write(message)
self.stream.flush()
async def _write_one(self, message) -> None:
await self.stream.send(message)
2 changes: 1 addition & 1 deletion python_packages/jupyter_lsp/jupyter_lsp/manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" A configurable frontend for stdio-based Language Servers
""" A configurable frontend for stream-based Language Servers
"""
import os
import traceback
Expand Down
30 changes: 30 additions & 0 deletions python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,36 @@
"description": "list of MIME types supported by the language server",
"title": "MIME Types"
},
"mode": {
"description": "connection mode used, e.g. stdio (default), tcp",
"title": "Mode",
"type": "string",
"enum": ["stdio", "tcp"]
FlyingSamson marked this conversation as resolved.
Show resolved Hide resolved
},
"port": {
"description": "the port for tcp mode connections. a null value will select a random, unused port",
"title": "Port",
"oneOf": [
{
"type": "integer"
},
{
"type": "null"
}
]
},
"host": {
"description": "the host for tcp mode connections. a null value will assume '127.0.0.1'",
"title": "Host",
"oneOf": [
{
"type": "string"
},
{
"type": "null"
}
]
},
"urls": {
"additionalProperties": {
"format": "uri",
Expand Down