From 3351e42185e58c56a6ec7b42584a06b57df1f055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Mon, 12 Dec 2022 22:34:35 +0000 Subject: [PATCH 01/17] Add regression tests and fixes for issue #1128 --- redis/asyncio/client.py | 13 ++++++++++-- redis/client.py | 14 +++++++++++-- tests/test_asyncio/test_commands.py | 32 +++++++++++++++++++++++++++++ tests/test_commands.py | 32 +++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 4 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index e56fd022fc..40a2c09f3d 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -477,8 +477,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option """ Send a command and parse the response """ - await conn.send_command(*args) - return await self.parse_response(conn, command_name, **options) + try: + await conn.send_command(*args) + return await self.parse_response(conn, command_name, **options) + except BaseException: + # An exception while reading or writing leaves the connection in + # an unknown state. There may be un-written or un-read data + # so we cannot re-use it for a subsequent command/response transaction. + # This may be a TimeoutError or any other error not handled by the + # Connection object itself. + await conn.disconnect(nowait=True) + raise async def _disconnect_raise(self, conn: Connection, error: Exception): """ diff --git a/redis/client.py b/redis/client.py index 1a9b96b83d..f927e0278a 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1231,8 +1231,18 @@ def _send_command_parse_response(self, conn, command_name, *args, **options): """ Send a command and parse the response """ - conn.send_command(*args) - return self.parse_response(conn, command_name, **options) + try: + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + except BaseException: + # An exception while reading or writing leaves the connection in + # an unknown state. There may be un-written or un-read data + # so we cannot re-use it for a subsequent command/response transaction. + # This can be any error not handled by the Connection itself, such + # as BaseExceptions may have been used to interrupt IO, when using + # gevent. + conn.disconnect() + raise def _disconnect_raise(self, conn, error): """ diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 7c6fd45ab9..0e32212baa 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -1,10 +1,12 @@ """ Tests async overrides of commands from their mixins """ +import asyncio import binascii import datetime import re from string import ascii_letters +import async_timeout import pytest import pytest_asyncio @@ -2999,6 +3001,36 @@ async def test_module_list(self, r: redis.Redis): for x in await r.module_list(): assert isinstance(x, dict) + @pytest.mark.onlynoncluster + async def test_interrupted_command(self, r: redis.Redis): + """ + Regression test for issue #1128: An Un-handled BaseException + will leave the socket with un-read response to a previous + command. + """ + ready = asyncio.Event() + + async def helper(): + with pytest.raises(asyncio.CancelledError): + # blocking pop + ready.set() + await r.brpop(["nonexist"]) + # if all is well, we can continue. The following should not hang. + await r.set("status", "down") + return "done" + + task = asyncio.create_task(helper()) + await ready.wait() + await asyncio.sleep(0.01) + # the task is now sleeping, lets send it an exception + task.cancel() + try: + async with async_timeout.timeout(0.1): + assert await task == "done" + except asyncio.TimeoutError: + task.cancel() + await task + @pytest.mark.onlynoncluster class TestBinarySave: diff --git a/tests/test_commands.py b/tests/test_commands.py index 94249e9419..adfb483190 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,9 +1,13 @@ import binascii import datetime import re +import threading import time from string import ascii_letters +from asyncio import CancelledError from unittest import mock +from unittest.mock import patch +import socket import pytest @@ -4726,6 +4730,34 @@ def test_psync(self, r): res = r2.psync(r2.client_id(), 1) assert b"FULLRESYNC" in res + @pytest.mark.onlynoncluster + def test_interrupted_command(self, r: redis.Redis): + """ + Regression test for issue #1128: An Un-handled BaseException + will leave the socket with un-read response to a previous + command. + """ + print("start") + + def helper(): + try: + # blocking pop + with patch.object( + socket.socket, "recv_into", side_effect=CancelledError + ) as mock_recv: + r.brpop(["nonexist"]) + except CancelledError: + print("canc") + pass # we got some BaseException. + # if all is well, we can continue. + r.set("status", "down") # should not hang + return "done" + + thread = threading.Thread(target=helper) + thread.start() + thread.join(0.1) + assert not thread.is_alive() + @pytest.mark.onlynoncluster class TestBinarySave: From 023c6453072cc38565e4acaab25bfad5573e8962 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 13 Dec 2022 16:13:21 +0000 Subject: [PATCH 02/17] Apply suggestions from code review Co-authored-by: Ilya Konstantinov --- tests/test_commands.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index adfb483190..0d1bd5d244 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -4751,7 +4751,6 @@ def helper(): pass # we got some BaseException. # if all is well, we can continue. r.set("status", "down") # should not hang - return "done" thread = threading.Thread(target=helper) thread.start() From f873523b3db00a93e8abd355f10e7e3675e40476 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 13 Dec 2022 16:58:30 +0000 Subject: [PATCH 03/17] Simplify test --- tests/test_asyncio/test_commands.py | 8 ++------ tests/test_commands.py | 8 ++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 0e32212baa..079cecae03 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -3017,18 +3017,14 @@ async def helper(): await r.brpop(["nonexist"]) # if all is well, we can continue. The following should not hang. await r.set("status", "down") - return "done" task = asyncio.create_task(helper()) await ready.wait() await asyncio.sleep(0.01) # the task is now sleeping, lets send it an exception task.cancel() - try: - async with async_timeout.timeout(0.1): - assert await task == "done" - except asyncio.TimeoutError: - task.cancel() + # If all is well, the task should finish right away, otherwise fail with Timeout + async with async_timeout.timeout(0.1): await task diff --git a/tests/test_commands.py b/tests/test_commands.py index 0d1bd5d244..189c18989e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -4737,18 +4737,14 @@ def test_interrupted_command(self, r: redis.Redis): will leave the socket with un-read response to a previous command. """ - print("start") - + def helper(): - try: + with pytest.raises(BaseException): # blocking pop with patch.object( socket.socket, "recv_into", side_effect=CancelledError ) as mock_recv: r.brpop(["nonexist"]) - except CancelledError: - print("canc") - pass # we got some BaseException. # if all is well, we can continue. r.set("status", "down") # should not hang From b0ff44b20507ea3a96a4673c3682b78946953810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 14:35:55 +0000 Subject: [PATCH 04/17] Revert change to Redis() object --- redis/asyncio/client.py | 13 ++----------- redis/client.py | 14 ++------------ 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 40a2c09f3d..e56fd022fc 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -477,17 +477,8 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option """ Send a command and parse the response """ - try: - await conn.send_command(*args) - return await self.parse_response(conn, command_name, **options) - except BaseException: - # An exception while reading or writing leaves the connection in - # an unknown state. There may be un-written or un-read data - # so we cannot re-use it for a subsequent command/response transaction. - # This may be a TimeoutError or any other error not handled by the - # Connection object itself. - await conn.disconnect(nowait=True) - raise + await conn.send_command(*args) + return await self.parse_response(conn, command_name, **options) async def _disconnect_raise(self, conn: Connection, error: Exception): """ diff --git a/redis/client.py b/redis/client.py index f927e0278a..1a9b96b83d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1231,18 +1231,8 @@ def _send_command_parse_response(self, conn, command_name, *args, **options): """ Send a command and parse the response """ - try: - conn.send_command(*args) - return self.parse_response(conn, command_name, **options) - except BaseException: - # An exception while reading or writing leaves the connection in - # an unknown state. There may be un-written or un-read data - # so we cannot re-use it for a subsequent command/response transaction. - # This can be any error not handled by the Connection itself, such - # as BaseExceptions may have been used to interrupt IO, when using - # gevent. - conn.disconnect() - raise + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) def _disconnect_raise(self, conn, error): """ From 6c268a17ab09e1543e62e38e4e8020385aa79b6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 14:54:59 +0000 Subject: [PATCH 05/17] Make disconnect_on_error optional for Connection.read_response() --- redis/asyncio/client.py | 4 +++- redis/asyncio/connection.py | 21 ++++++++++++--------- redis/client.py | 2 +- redis/connection.py | 16 +++++++++++----- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index e56fd022fc..edf71ef53a 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -781,7 +781,9 @@ async def parse_response(self, block: bool = True, timeout: float = 0): await conn.connect() read_timeout = None if block else timeout - response = await self._execute(conn, conn.read_response, timeout=read_timeout) + response = await self._execute( + conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False + ) if conn.health_check_interval and response == self.health_check_response: # ignore the health check message as user might not expect it diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 2c75d4fcf1..8b75da4320 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -820,6 +820,7 @@ async def read_response( self, disable_decoding: bool = False, timeout: Optional[float] = None, + disconnect_on_error: bool = True, ): """Read the response from a previously sent command""" read_timeout = timeout if timeout is not None else self.socket_timeout @@ -835,22 +836,24 @@ async def read_response( ) except asyncio.TimeoutError: if timeout is not None: - # user requested timeout, return None + # user requested timeout, return None. Operation can be retried return None # it was a self.socket_timeout error. - await self.disconnect(nowait=True) + if disconnect_on_error: + await self.disconnect(nowait=True) raise TimeoutError(f"Timeout reading from {self.host}:{self.port}") except OSError as e: - await self.disconnect(nowait=True) + if disconnect_on_error: + await self.disconnect(nowait=True) raise ConnectionError( f"Error while reading from {self.host}:{self.port} : {e.args}" ) - except asyncio.CancelledError: - # need this check for 3.7, where CancelledError - # is subclass of Exception, not BaseException - raise - except Exception: - await self.disconnect(nowait=True) + except BaseException: + # Also by default close in case of BaseException. A lot of code + # relies on this behaviour when doing Command/Response pairs. + # See #1128. + if disconnect_on_error: + await self.disconnect(nowait=True) raise if self.health_check_interval: diff --git a/redis/client.py b/redis/client.py index 1a9b96b83d..711a7d3085 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1526,7 +1526,7 @@ def try_read(): return None else: conn.connect() - return conn.read_response() + return conn.read_response(disconnect_on_error=False) response = self._execute(conn, try_read) diff --git a/redis/connection.py b/redis/connection.py index 126ea5db32..6b938e1160 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -839,7 +839,7 @@ def can_read(self, timeout=0): self.disconnect() raise ConnectionError(f"Error while reading from {host_error}: {e.args}") - def read_response(self, disable_decoding=False): + def read_response(self, disable_decoding=False, disconnect_on_error: bool = True): """Read the response from a previously sent command""" host_error = self._host_error() @@ -847,15 +847,21 @@ def read_response(self, disable_decoding=False): try: response = self._parser.read_response(disable_decoding=disable_decoding) except socket.timeout: - self.disconnect() + if disconnect_on_error: + self.disconnect() raise TimeoutError(f"Timeout reading from {host_error}") except OSError as e: - self.disconnect() + if disconnect_on_error: + self.disconnect() raise ConnectionError( f"Error while reading from {host_error}" f" : {e.args}" ) - except Exception: - self.disconnect() + except BaseException: + # Also by default close in case of BaseException. A lot of code + # relies on this behaviour when doing Command/Response pairs. + # See #1128. + if disconnect_on_error: + self.disconnect() raise if self.health_check_interval: From 03171f52cf1d6cd7f16358adc482191d5ac77b45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 13:01:47 +0000 Subject: [PATCH 06/17] Close a Connection whenever an exception is raised for send_command() The api does not allow for a "resume", e.g. on a timeout, because an unknown number of bytes has been sent and an internal send state is not maintained. Therefore, there is no point in keeping the connection open. --- redis/asyncio/connection.py | 6 +++++- redis/connection.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 8b75da4320..64c6994510 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -796,7 +796,11 @@ async def send_packed_command( raise ConnectionError( f"Error {err_no} while writing to socket. {errmsg}." ) from e - except Exception: + except BaseException: + # The send_packed_command api does not support re-trying a partially + # sent message, so there is no point in keeping the connection open. + # An unknown number of bytes has been sent and the connection is therefore + # unusable. await self.disconnect(nowait=True) raise diff --git a/redis/connection.py b/redis/connection.py index 6b938e1160..8209c8c269 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -815,7 +815,11 @@ def send_packed_command(self, command, check_health=True): errno = e.args[0] errmsg = e.args[1] raise ConnectionError(f"Error {errno} while writing to socket. {errmsg}.") - except Exception: + except BaseException: + # The send_packed_command api does not support re-trying a partially + # sent message, so there is no point in keeping the connection open. + # An unknown number of bytes has been sent and the connection is therefore + # unusable. self.disconnect() raise From 4b6caaa47b696823363be89a7ba284c449db52d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 15:13:11 +0000 Subject: [PATCH 07/17] Add CHANGES --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index fca8d3168e..f6712d5a8b 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Revert #2104, add `disconnect_on_error` option to `read_response()` (#2506) * Make PythonParser resumable in case of error (#2510) * Add `timeout=None` in `SentinelConnectionManager.read_response` * Documentation fix: password protected socket connection (#2374) From 76db492129b631ae5f7a7241b45ac781559aab1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 17:06:31 +0000 Subject: [PATCH 08/17] New methods are keyword-only. Fix linting. --- redis/asyncio/connection.py | 1 + redis/connection.py | 4 +++- tests/test_commands.py | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 64c6994510..127702ef23 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -823,6 +823,7 @@ async def can_read_destructive(self): async def read_response( self, disable_decoding: bool = False, + *, timeout: Optional[float] = None, disconnect_on_error: bool = True, ): diff --git a/redis/connection.py b/redis/connection.py index 8209c8c269..a3c4233a18 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -843,7 +843,9 @@ def can_read(self, timeout=0): self.disconnect() raise ConnectionError(f"Error while reading from {host_error}: {e.args}") - def read_response(self, disable_decoding=False, disconnect_on_error: bool = True): + def read_response( + self, disable_decoding=False, *, disconnect_on_error: bool = True + ): """Read the response from a previously sent command""" host_error = self._host_error() diff --git a/tests/test_commands.py b/tests/test_commands.py index 189c18989e..20de4e784e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -4737,13 +4737,13 @@ def test_interrupted_command(self, r: redis.Redis): will leave the socket with un-read response to a previous command. """ - + def helper(): with pytest.raises(BaseException): # blocking pop with patch.object( socket.socket, "recv_into", side_effect=CancelledError - ) as mock_recv: + ): r.brpop(["nonexist"]) # if all is well, we can continue. r.set("status", "down") # should not hang From 04c5a979ba1173ebced871e6758e2fcf20e85729 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 18:35:01 +0000 Subject: [PATCH 09/17] isort --- tests/test_asyncio/test_commands.py | 2 +- tests/test_commands.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 079cecae03..75b833d601 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -6,8 +6,8 @@ import datetime import re from string import ascii_letters -import async_timeout +import async_timeout import pytest import pytest_asyncio diff --git a/tests/test_commands.py b/tests/test_commands.py index 20de4e784e..0f8c7e3811 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,13 +1,13 @@ import binascii import datetime import re +import socket import threading import time -from string import ascii_letters from asyncio import CancelledError +from string import ascii_letters from unittest import mock from unittest.mock import patch -import socket import pytest From 6838db87dce15780126ce4ee9564246d68cfcb3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 18:42:03 +0000 Subject: [PATCH 10/17] Clarify comment. --- redis/asyncio/connection.py | 8 ++++---- redis/connection.py | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 127702ef23..d3c88ecc71 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -797,10 +797,10 @@ async def send_packed_command( f"Error {err_no} while writing to socket. {errmsg}." ) from e except BaseException: - # The send_packed_command api does not support re-trying a partially - # sent message, so there is no point in keeping the connection open. - # An unknown number of bytes has been sent and the connection is therefore - # unusable. + # BaseExceptions can be raised when a send*() is not finished, e.g. due + # to a timeout. Ideally, a caller could then re-try, to continue + # sending partially-sent data. However, the send_packed_command() API + # does not it so there is no point in keeping the connection open. await self.disconnect(nowait=True) raise diff --git a/redis/connection.py b/redis/connection.py index a3c4233a18..9e9b0e5f49 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -816,11 +816,11 @@ def send_packed_command(self, command, check_health=True): errmsg = e.args[1] raise ConnectionError(f"Error {errno} while writing to socket. {errmsg}.") except BaseException: - # The send_packed_command api does not support re-trying a partially - # sent message, so there is no point in keeping the connection open. - # An unknown number of bytes has been sent and the connection is therefore - # unusable. - self.disconnect() + # BaseExceptions can be raised when a send() is not finished, e.g. due + # to a timeout. Ideally, a caller could then re-try, to continue + # sending partially-sent data. However, the send_packed_command() API + # does not it so there is no point in keeping the connection open. + self.disconnect() raise def send_command(self, *args, **kwargs): From b26fda3ea084d65f3413549e451124fe692cc7a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 19:27:09 +0000 Subject: [PATCH 11/17] fix typo and missing word. --- redis/asyncio/connection.py | 4 ++-- redis/connection.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index d3c88ecc71..05f86659f0 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -797,10 +797,10 @@ async def send_packed_command( f"Error {err_no} while writing to socket. {errmsg}." ) from e except BaseException: - # BaseExceptions can be raised when a send*() is not finished, e.g. due + # BaseExceptions can be raised when a send() is not finished, e.g. due # to a timeout. Ideally, a caller could then re-try, to continue # sending partially-sent data. However, the send_packed_command() API - # does not it so there is no point in keeping the connection open. + # does not support it so there is no point in keeping the connection open. await self.disconnect(nowait=True) raise diff --git a/redis/connection.py b/redis/connection.py index 9e9b0e5f49..7484e3cf6c 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -819,7 +819,7 @@ def send_packed_command(self, command, check_health=True): # BaseExceptions can be raised when a send() is not finished, e.g. due # to a timeout. Ideally, a caller could then re-try, to continue # sending partially-sent data. However, the send_packed_command() API - # does not it so there is no point in keeping the connection open. + # does not support it so there is no point in keeping the connection open. self.disconnect() raise From 6345a69652849965ce9c3526349806543a8408cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 15 Dec 2022 19:52:38 +0000 Subject: [PATCH 12/17] more fixes to comment, and indentation --- redis/asyncio/connection.py | 6 +++--- redis/connection.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 05f86659f0..fb57edb692 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -797,9 +797,9 @@ async def send_packed_command( f"Error {err_no} while writing to socket. {errmsg}." ) from e except BaseException: - # BaseExceptions can be raised when a send() is not finished, e.g. due - # to a timeout. Ideally, a caller could then re-try, to continue - # sending partially-sent data. However, the send_packed_command() API + # BaseExceptions can be raised when a socket send operation is not + # finished, e.g. due to a timeout. Ideally, a caller could then re-try + # to send un-sent data. However, the send_packed_command() API # does not support it so there is no point in keeping the connection open. await self.disconnect(nowait=True) raise diff --git a/redis/connection.py b/redis/connection.py index 7484e3cf6c..4ad5664126 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -816,11 +816,11 @@ def send_packed_command(self, command, check_health=True): errmsg = e.args[1] raise ConnectionError(f"Error {errno} while writing to socket. {errmsg}.") except BaseException: - # BaseExceptions can be raised when a send() is not finished, e.g. due - # to a timeout. Ideally, a caller could then re-try, to continue - # sending partially-sent data. However, the send_packed_command() API + # BaseExceptions can be raised when a socket send operation is not + # finished, e.g. due to a timeout. Ideally, a caller could then re-try + # to send un-sent data. However, the send_packed_command() API # does not support it so there is no point in keeping the connection open. - self.disconnect() + self.disconnect() raise def send_command(self, *args, **kwargs): From 524c454f33389295925ff935e563e6edc91e6610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 5 Jan 2023 13:47:40 +0000 Subject: [PATCH 13/17] Fix tests for resumable read_response to use "disconnect_on_error" --- tests/test_asyncio/test_connection.py | 2 +- tests/test_connection.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index bf59dbe6b0..83e48ade4e 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -137,7 +137,7 @@ async def test_connection_parse_response_resume(r: redis.Redis): conn._parser._stream = MockStream(message, interrupt_every=2) for i in range(100): try: - response = await conn.read_response() + response = await conn.read_response(disconnect_on_error=False) break except MockStream.TestError: pass diff --git a/tests/test_connection.py b/tests/test_connection.py index e0b53cdf37..e49db84407 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -154,7 +154,7 @@ def test_connection_parse_response_resume(r: redis.Redis, parser_class): conn._parser._sock = mock_socket for i in range(100): try: - response = conn.read_response() + response = conn.read_response(disconnect_on_error=False) break except MockSocket.TestError: pass From 1c32296f2d733e00d1750372a6a73f1e7f9d6ca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 5 Jan 2023 14:54:23 +0000 Subject: [PATCH 14/17] Fix unittest to not rely on read buffer behaviour --- tests/test_commands.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/test_commands.py b/tests/test_commands.py index 0f8c7e3811..b08a554917 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,7 +1,6 @@ import binascii import datetime import re -import socket import threading import time from asyncio import CancelledError @@ -4738,20 +4737,29 @@ def test_interrupted_command(self, r: redis.Redis): command. """ + ok = False + def helper(): - with pytest.raises(BaseException): + with pytest.raises(CancelledError): # blocking pop with patch.object( - socket.socket, "recv_into", side_effect=CancelledError + r.connection._parser, "read_response", side_effect=CancelledError ): r.brpop(["nonexist"]) # if all is well, we can continue. r.set("status", "down") # should not hang + nonlocal ok + ok = True thread = threading.Thread(target=helper) thread.start() thread.join(0.1) - assert not thread.is_alive() + try: + assert not thread.is_alive() + assert ok + finally: + # disconnect here so that fixture cleanup can proceed + r.connection.disconnect() @pytest.mark.onlynoncluster From 8df2c00e45c8b665c0588a4354665dd8d6a6a74c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 8 Feb 2023 07:38:37 +0000 Subject: [PATCH 15/17] Fix CHANGES --- CHANGES | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES b/CHANGES index ecfd9aa27b..58974db0cb 100644 --- a/CHANGES +++ b/CHANGES @@ -1,8 +1,8 @@ + * Revert #2104, add `disconnect_on_error` option to `read_response()` (#2506) * Add test and fix async HiredisParser when reading during a disconnect() (#2349) * Use hiredis-py pack_command if available. * Support `.unlink()` in ClusterPipeline * Simplify synchronous SocketBuffer state management - * Revert #2104, add `disconnect_on_error` option to `read_response()` (#2506) * Fix string cleanse in Redis Graph * Make PythonParser resumable in case of error (#2510) * Add `timeout=None` in `SentinelConnectionManager.read_response` From 794b28597690f104b50ff111e1886df221661dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 17 Mar 2023 14:36:26 +0000 Subject: [PATCH 16/17] Fixing unit test on python 3.11 --- tests/test_asyncio/test_commands.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 75b833d601..82c051e373 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -5,9 +5,9 @@ import binascii import datetime import re +import sys from string import ascii_letters -import async_timeout import pytest import pytest_asyncio @@ -20,6 +20,11 @@ skip_unless_arch_bits, ) +if sys.version_info.major >= 3 and sys.version_info.minor >= 11: + from asyncio import timeout as async_timeout +else: + from async_timeout import timeout as async_timeout + REDIS_6_VERSION = "5.9.0" @@ -3015,6 +3020,11 @@ async def helper(): # blocking pop ready.set() await r.brpop(["nonexist"]) + # If the following is not done, further Timout operations will fail, + # because the timeout won't catch its Cancelled Error if the task + # has a pending cancel. Python documentation probably should reflect this. + if sys.version_info.major >= 3 and sys.version_info.minor >= 11: + asyncio.current_task().uncancel() # if all is well, we can continue. The following should not hang. await r.set("status", "down") @@ -3024,7 +3034,7 @@ async def helper(): # the task is now sleeping, lets send it an exception task.cancel() # If all is well, the task should finish right away, otherwise fail with Timeout - async with async_timeout.timeout(0.1): + async with async_timeout(0.1): await task From e34f4560a4311cf5c7dda7e9a6038d4557bcbb50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Tue, 4 Apr 2023 08:34:48 +0000 Subject: [PATCH 17/17] Attempted deadlock fix --- redis/asyncio/client.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index cb0dcb1731..b7aca5793d 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -512,8 +512,6 @@ async def _try_send_command_parse_response(self, conn, *args, **options): await conn.disconnect(nowait=True) raise finally: - if self.single_connection_client: - self._single_conn_lock.release() if not self.connection: await self.connection_pool.release(conn) @@ -525,12 +523,20 @@ async def execute_command(self, *args, **options): command_name = args[0] conn = self.connection or await pool.get_connection(command_name, **options) + # locking / unlocking must be handled in same task, otherwise we will deadlock + # if parent task is cancelled. + # Parent task is preferable because it always gets cancelled, child task won´t + # get cancelled if parent is cancelled and the lock may be held indefinitely. if self.single_connection_client: await self._single_conn_lock.acquire() - return await asyncio.shield( - self._try_send_command_parse_response(conn, *args, **options) - ) + try: + return await asyncio.shield( + self._try_send_command_parse_response(conn, *args, **options) + ) + finally: + if self.single_connection_client: + self._single_conn_lock.release() async def parse_response( self, connection: Connection, command_name: Union[str, bytes], **options