Skip to content

Commit

Permalink
Prevent close() from blocking when reading is paused.
Browse files Browse the repository at this point in the history
Closing the transport normally is achieved with transport.write_eof().

Closing it in abnormal situations relied on transport.close(). However,
that didn't lead to connection_lost() when reading is paused.

Replacing it with transport.abort() ensures that buffers are dropped
(which is what we want in abnormal situations) and connection_lost()
called quickly.

Fix #1555 (for real!)
  • Loading branch information
aaugustin committed Jan 19, 2025
1 parent e7a098e commit 613f3f0
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 17 deletions.
5 changes: 3 additions & 2 deletions docs/project/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ Bug fixes
:mod:`threading` implementation. If a message is already received, it is
returned. Previously, :exc:`TimeoutError` was raised incorrectly.

* Prevented :meth:`~sync.connection.Connection.close` from blocking when
receive buffers are saturated in the :mod:`threading` implementation.
* Prevented :meth:`~asyncio.connection.Connection.close` from blocking when
receive buffers are saturated in the :mod:`asyncio` and :mod:`threading`
implementations.

.. _14.1:

Expand Down
4 changes: 2 additions & 2 deletions src/websockets/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ async def __await_impl__(self) -> ClientConnection:
try:
await self.connection.handshake(*self.handshake_args)
except asyncio.CancelledError:
self.connection.close_transport()
self.connection.transport.abort()
raise
except Exception as exc:
# Always close the connection even though keep-alive is
Expand All @@ -454,7 +454,7 @@ async def __await_impl__(self) -> ClientConnection:
# protocol. In the current design of connect(), there is
# no easy way to reuse the network connection that works
# in every case nor to reinitialize the protocol.
self.connection.close_transport()
self.connection.transport.abort()

uri_or_exc = self.process_redirect(exc)
# Response is a valid redirect; follow it.
Expand Down
10 changes: 1 addition & 9 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ async def send_context(
# If an error occurred, close the transport to terminate the connection and
# raise an exception.
if raise_close_exc:
self.close_transport()
self.transport.abort()
# Wait for the protocol state to be CLOSED before accessing close_exc.
await asyncio.shield(self.connection_lost_waiter)
raise self.protocol.close_exc from original_exc
Expand Down Expand Up @@ -969,14 +969,6 @@ def set_recv_exc(self, exc: BaseException | None) -> None:
if self.recv_exc is None:
self.recv_exc = exc

def close_transport(self) -> None:
"""
Close transport and message assembler.
"""
self.transport.close()
self.recv_messages.close()

# asyncio.Protocol methods

# Connection callbacks
Expand Down
6 changes: 3 additions & 3 deletions src/websockets/asyncio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,16 @@ async def conn_handler(self, connection: ServerConnection) -> None:
self.server_header,
)
except asyncio.CancelledError:
connection.close_transport()
connection.transport.abort()
raise
except Exception:
connection.logger.error("opening handshake failed", exc_info=True)
connection.close_transport()
connection.transport.abort()
return

if connection.protocol.state is not OPEN:
# process_request or process_response rejected the handshake.
connection.close_transport()
connection.transport.abort()
return

try:
Expand Down
2 changes: 1 addition & 1 deletion tests/asyncio/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ async def test_connection_closed_during_handshake(self):
"""Client reads EOF before receiving handshake response from server."""

def close_connection(self, request):
self.close_transport()
self.transport.close()

async with serve(*args, process_request=close_connection) as server:
with self.assertRaises(InvalidMessage) as raised:
Expand Down

0 comments on commit 613f3f0

Please # to comment.