Skip to content

Commit 91ff024

Browse files
committedMay 3, 2023
Add regression tests and fixes for issue redis#1128
1 parent e52fd67 commit 91ff024

File tree

6 files changed

+113
-18
lines changed

6 files changed

+113
-18
lines changed
 

‎redis/asyncio/client.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,9 @@ async def parse_response(self, block: bool = True, timeout: float = 0):
816816
await conn.connect()
817817

818818
read_timeout = None if block else timeout
819-
response = await self._execute(conn, conn.read_response, timeout=read_timeout)
819+
response = await self._execute(
820+
conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False
821+
)
820822

821823
if conn.health_check_interval and response == self.health_check_response:
822824
# ignore the health check message as user might not expect it

‎redis/asyncio/connection.py

+18-10
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,11 @@ async def send_packed_command(
804804
raise ConnectionError(
805805
f"Error {err_no} while writing to socket. {errmsg}."
806806
) from e
807-
except Exception:
807+
except BaseException:
808+
# BaseExceptions can be raised when a socket send operation is not
809+
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
810+
# to send un-sent data. However, the send_packed_command() API
811+
# does not support it so there is no point in keeping the connection open.
808812
await self.disconnect(nowait=True)
809813
raise
810814

@@ -827,7 +831,9 @@ async def can_read_destructive(self):
827831
async def read_response(
828832
self,
829833
disable_decoding: bool = False,
834+
*,
830835
timeout: Optional[float] = None,
836+
disconnect_on_error: bool = True,
831837
):
832838
"""Read the response from a previously sent command"""
833839
read_timeout = timeout if timeout is not None else self.socket_timeout
@@ -843,22 +849,24 @@ async def read_response(
843849
)
844850
except asyncio.TimeoutError:
845851
if timeout is not None:
846-
# user requested timeout, return None
852+
# user requested timeout, return None. Operation can be retried
847853
return None
848854
# it was a self.socket_timeout error.
849-
await self.disconnect(nowait=True)
855+
if disconnect_on_error:
856+
await self.disconnect(nowait=True)
850857
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
851858
except OSError as e:
852-
await self.disconnect(nowait=True)
859+
if disconnect_on_error:
860+
await self.disconnect(nowait=True)
853861
raise ConnectionError(
854862
f"Error while reading from {self.host}:{self.port} : {e.args}"
855863
)
856-
except asyncio.CancelledError:
857-
# need this check for 3.7, where CancelledError
858-
# is subclass of Exception, not BaseException
859-
raise
860-
except Exception:
861-
await self.disconnect(nowait=True)
864+
except BaseException:
865+
# Also by default close in case of BaseException. A lot of code
866+
# relies on this behaviour when doing Command/Response pairs.
867+
# See #1128.
868+
if disconnect_on_error:
869+
await self.disconnect(nowait=True)
862870
raise
863871

864872
if self.health_check_interval:

‎redis/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1529,7 +1529,7 @@ def try_read():
15291529
return None
15301530
else:
15311531
conn.connect()
1532-
return conn.read_response()
1532+
return conn.read_response(disconnect_on_error=False)
15331533

15341534
response = self._execute(conn, try_read)
15351535

‎redis/connection.py

+18-6
Original file line numberDiff line numberDiff line change
@@ -834,7 +834,11 @@ def send_packed_command(self, command, check_health=True):
834834
errno = e.args[0]
835835
errmsg = e.args[1]
836836
raise ConnectionError(f"Error {errno} while writing to socket. {errmsg}.")
837-
except Exception:
837+
except BaseException:
838+
# BaseExceptions can be raised when a socket send operation is not
839+
# finished, e.g. due to a timeout. Ideally, a caller could then re-try
840+
# to send un-sent data. However, the send_packed_command() API
841+
# does not support it so there is no point in keeping the connection open.
838842
self.disconnect()
839843
raise
840844

@@ -859,23 +863,31 @@ def can_read(self, timeout=0):
859863
self.disconnect()
860864
raise ConnectionError(f"Error while reading from {host_error}: {e.args}")
861865

862-
def read_response(self, disable_decoding=False):
866+
def read_response(
867+
self, disable_decoding=False, *, disconnect_on_error: bool = True
868+
):
863869
"""Read the response from a previously sent command"""
864870

865871
host_error = self._host_error()
866872

867873
try:
868874
response = self._parser.read_response(disable_decoding=disable_decoding)
869875
except socket.timeout:
870-
self.disconnect()
876+
if disconnect_on_error:
877+
self.disconnect()
871878
raise TimeoutError(f"Timeout reading from {host_error}")
872879
except OSError as e:
873-
self.disconnect()
880+
if disconnect_on_error:
881+
self.disconnect()
874882
raise ConnectionError(
875883
f"Error while reading from {host_error}" f" : {e.args}"
876884
)
877-
except Exception:
878-
self.disconnect()
885+
except BaseException:
886+
# Also by default close in case of BaseException. A lot of code
887+
# relies on this behaviour when doing Command/Response pairs.
888+
# See #1128.
889+
if disconnect_on_error:
890+
self.disconnect()
879891
raise
880892

881893
if self.health_check_interval:

‎tests/test_asyncio/test_commands.py

+38
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""
22
Tests async overrides of commands from their mixins
33
"""
4+
import asyncio
45
import binascii
56
import datetime
67
import re
8+
import sys
79
from string import ascii_letters
810

911
import pytest
@@ -18,6 +20,11 @@
1820
skip_unless_arch_bits,
1921
)
2022

23+
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
24+
from asyncio import timeout as async_timeout
25+
else:
26+
from async_timeout import timeout as async_timeout
27+
2128
REDIS_6_VERSION = "5.9.0"
2229

2330

@@ -3008,6 +3015,37 @@ async def test_module_list(self, r: redis.Redis):
30083015
for x in await r.module_list():
30093016
assert isinstance(x, dict)
30103017

3018+
@pytest.mark.onlynoncluster
3019+
async def test_interrupted_command(self, r: redis.Redis):
3020+
"""
3021+
Regression test for issue #1128: An Un-handled BaseException
3022+
will leave the socket with un-read response to a previous
3023+
command.
3024+
"""
3025+
ready = asyncio.Event()
3026+
3027+
async def helper():
3028+
with pytest.raises(asyncio.CancelledError):
3029+
# blocking pop
3030+
ready.set()
3031+
await r.brpop(["nonexist"])
3032+
# If the following is not done, further Timout operations will fail,
3033+
# because the timeout won't catch its Cancelled Error if the task
3034+
# has a pending cancel. Python documentation probably should reflect this.
3035+
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
3036+
asyncio.current_task().uncancel()
3037+
# if all is well, we can continue. The following should not hang.
3038+
await r.set("status", "down")
3039+
3040+
task = asyncio.create_task(helper())
3041+
await ready.wait()
3042+
await asyncio.sleep(0.01)
3043+
# the task is now sleeping, lets send it an exception
3044+
task.cancel()
3045+
# If all is well, the task should finish right away, otherwise fail with Timeout
3046+
async with async_timeout(0.1):
3047+
await task
3048+
30113049

30123050
@pytest.mark.onlynoncluster
30133051
class TestBinarySave:

‎tests/test_commands.py

+35
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import binascii
22
import datetime
33
import re
4+
import threading
45
import time
6+
from asyncio import CancelledError
57
from string import ascii_letters
68
from unittest import mock
9+
from unittest.mock import patch
710

811
import pytest
912

@@ -4741,6 +4744,38 @@ def test_psync(self, r):
47414744
res = r2.psync(r2.client_id(), 1)
47424745
assert b"FULLRESYNC" in res
47434746

4747+
@pytest.mark.onlynoncluster
4748+
def test_interrupted_command(self, r: redis.Redis):
4749+
"""
4750+
Regression test for issue #1128: An Un-handled BaseException
4751+
will leave the socket with un-read response to a previous
4752+
command.
4753+
"""
4754+
4755+
ok = False
4756+
4757+
def helper():
4758+
with pytest.raises(CancelledError):
4759+
# blocking pop
4760+
with patch.object(
4761+
r.connection._parser, "read_response", side_effect=CancelledError
4762+
):
4763+
r.brpop(["nonexist"])
4764+
# if all is well, we can continue.
4765+
r.set("status", "down") # should not hang
4766+
nonlocal ok
4767+
ok = True
4768+
4769+
thread = threading.Thread(target=helper)
4770+
thread.start()
4771+
thread.join(0.1)
4772+
try:
4773+
assert not thread.is_alive()
4774+
assert ok
4775+
finally:
4776+
# disconnect here so that fixture cleanup can proceed
4777+
r.connection.disconnect()
4778+
47444779

47454780
@pytest.mark.onlynoncluster
47464781
class TestBinarySave:

0 commit comments

Comments
 (0)