Skip to content

Commit 284af1b

Browse files
Merge branch 'master' into kristjan/interrupted-send-receive
2 parents 0b599bd + e1017fd commit 284af1b

File tree

8 files changed

+256
-38
lines changed

8 files changed

+256
-38
lines changed

.github/workflows/integration.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ jobs:
5151
timeout-minutes: 30
5252
strategy:
5353
max-parallel: 15
54+
fail-fast: false
5455
matrix:
5556
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9']
5657
test-type: ['standalone', 'cluster']
@@ -108,6 +109,7 @@ jobs:
108109
name: Install package from commit hash
109110
runs-on: ubuntu-latest
110111
strategy:
112+
fail-fast: false
111113
matrix:
112114
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9']
113115
steps:

redis/asyncio/client.py

+70-24
Original file line numberDiff line numberDiff line change
@@ -500,28 +500,37 @@ async def _disconnect_raise(self, conn: Connection, error: Exception):
500500
):
501501
raise error
502502

503-
# COMMAND EXECUTION AND PROTOCOL PARSING
504-
async def execute_command(self, *args, **options):
505-
"""Execute a command and return a parsed response"""
506-
await self.initialize()
507-
pool = self.connection_pool
508-
command_name = args[0]
509-
conn = self.connection or await pool.get_connection(command_name, **options)
510-
511-
if self.single_connection_client:
512-
await self._single_conn_lock.acquire()
503+
async def _try_send_command_parse_response(self, conn, *args, **options):
513504
try:
514505
return await conn.retry.call_with_retry(
515506
lambda: self._send_command_parse_response(
516-
conn, command_name, *args, **options
507+
conn, args[0], *args, **options
517508
),
518509
lambda error: self._disconnect_raise(conn, error),
519510
)
511+
except asyncio.CancelledError:
512+
await conn.disconnect(nowait=True)
513+
raise
520514
finally:
521515
if self.single_connection_client:
522516
self._single_conn_lock.release()
523517
if not self.connection:
524-
await pool.release(conn)
518+
await self.connection_pool.release(conn)
519+
520+
# COMMAND EXECUTION AND PROTOCOL PARSING
521+
async def execute_command(self, *args, **options):
522+
"""Execute a command and return a parsed response"""
523+
await self.initialize()
524+
pool = self.connection_pool
525+
command_name = args[0]
526+
conn = self.connection or await pool.get_connection(command_name, **options)
527+
528+
if self.single_connection_client:
529+
await self._single_conn_lock.acquire()
530+
531+
return await asyncio.shield(
532+
self._try_send_command_parse_response(conn, *args, **options)
533+
)
525534

526535
async def parse_response(
527536
self, connection: Connection, command_name: Union[str, bytes], **options
@@ -702,6 +711,11 @@ async def reset(self):
702711
self.pending_unsubscribe_patterns = set()
703712

704713
def close(self) -> Awaitable[NoReturn]:
714+
# In case a connection property does not yet exist
715+
# (due to a crash earlier in the Redis() constructor), return
716+
# immediately as there is nothing to clean-up.
717+
if not hasattr(self, "connection"):
718+
return
705719
return self.reset()
706720

707721
async def on_connect(self, connection: Connection):
@@ -760,10 +774,18 @@ async def _disconnect_raise_connect(self, conn, error):
760774
is not a TimeoutError. Otherwise, try to reconnect
761775
"""
762776
await conn.disconnect()
777+
763778
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
764779
raise error
765780
await conn.connect()
766781

782+
async def _try_execute(self, conn, command, *arg, **kwargs):
783+
try:
784+
return await command(*arg, **kwargs)
785+
except asyncio.CancelledError:
786+
await conn.disconnect()
787+
raise
788+
767789
async def _execute(self, conn, command, *args, **kwargs):
768790
"""
769791
Connect manually upon disconnection. If the Redis server is down,
@@ -772,9 +794,11 @@ async def _execute(self, conn, command, *args, **kwargs):
772794
called by the # connection to resubscribe us to any channels and
773795
patterns we were previously listening to
774796
"""
775-
return await conn.retry.call_with_retry(
776-
lambda: command(*args, **kwargs),
777-
lambda error: self._disconnect_raise_connect(conn, error),
797+
return await asyncio.shield(
798+
conn.retry.call_with_retry(
799+
lambda: self._try_execute(conn, command, *args, **kwargs),
800+
lambda error: self._disconnect_raise_connect(conn, error),
801+
)
778802
)
779803

780804
async def parse_response(self, block: bool = True, timeout: float = 0):
@@ -1178,6 +1202,18 @@ async def _disconnect_reset_raise(self, conn, error):
11781202
await self.reset()
11791203
raise
11801204

1205+
async def _try_send_command_parse_response(self, conn, *args, **options):
1206+
try:
1207+
return await conn.retry.call_with_retry(
1208+
lambda: self._send_command_parse_response(
1209+
conn, args[0], *args, **options
1210+
),
1211+
lambda error: self._disconnect_reset_raise(conn, error),
1212+
)
1213+
except asyncio.CancelledError:
1214+
await conn.disconnect()
1215+
raise
1216+
11811217
async def immediate_execute_command(self, *args, **options):
11821218
"""
11831219
Execute a command immediately, but don't auto-retry on a
@@ -1193,12 +1229,8 @@ async def immediate_execute_command(self, *args, **options):
11931229
command_name, self.shard_hint
11941230
)
11951231
self.connection = conn
1196-
1197-
return await conn.retry.call_with_retry(
1198-
lambda: self._send_command_parse_response(
1199-
conn, command_name, *args, **options
1200-
),
1201-
lambda error: self._disconnect_reset_raise(conn, error),
1232+
return await asyncio.shield(
1233+
self._try_send_command_parse_response(conn, *args, **options)
12021234
)
12031235

12041236
def pipeline_execute_command(self, *args, **options):
@@ -1366,6 +1398,19 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
13661398
await self.reset()
13671399
raise
13681400

1401+
async def _try_execute(self, conn, execute, stack, raise_on_error):
1402+
try:
1403+
return await conn.retry.call_with_retry(
1404+
lambda: execute(conn, stack, raise_on_error),
1405+
lambda error: self._disconnect_raise_reset(conn, error),
1406+
)
1407+
except asyncio.CancelledError:
1408+
# not supposed to be possible, yet here we are
1409+
await conn.disconnect(nowait=True)
1410+
raise
1411+
finally:
1412+
await self.reset()
1413+
13691414
async def execute(self, raise_on_error: bool = True):
13701415
"""Execute all the commands in the current pipeline"""
13711416
stack = self.command_stack
@@ -1387,10 +1432,11 @@ async def execute(self, raise_on_error: bool = True):
13871432
conn = cast(Connection, conn)
13881433

13891434
try:
1390-
return await conn.retry.call_with_retry(
1391-
lambda: execute(conn, stack, raise_on_error),
1392-
lambda error: self._disconnect_raise_reset(conn, error),
1435+
return await asyncio.shield(
1436+
self._try_execute(conn, execute, stack, raise_on_error)
13931437
)
1438+
except RuntimeError:
1439+
await self.reset()
13941440
finally:
13951441
await self.reset()
13961442

redis/asyncio/cluster.py

+24-9
Original file line numberDiff line numberDiff line change
@@ -1002,12 +1002,33 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
10021002
await connection.send_packed_command(connection.pack_command(*args), False)
10031003

10041004
# Read response
1005+
return await asyncio.shield(
1006+
self._parse_and_release(connection, args[0], **kwargs)
1007+
)
1008+
1009+
async def _parse_and_release(self, connection, *args, **kwargs):
10051010
try:
1006-
return await self.parse_response(connection, args[0], **kwargs)
1011+
return await self.parse_response(connection, *args, **kwargs)
1012+
except asyncio.CancelledError:
1013+
# should not be possible
1014+
await connection.disconnect(nowait=True)
1015+
raise
10071016
finally:
1008-
# Release connection
10091017
self._free.append(connection)
10101018

1019+
async def _try_parse_response(self, cmd, connection, ret):
1020+
try:
1021+
cmd.result = await asyncio.shield(
1022+
self.parse_response(connection, cmd.args[0], **cmd.kwargs)
1023+
)
1024+
except asyncio.CancelledError:
1025+
await connection.disconnect(nowait=True)
1026+
raise
1027+
except Exception as e:
1028+
cmd.result = e
1029+
ret = True
1030+
return ret
1031+
10111032
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10121033
# Acquire connection
10131034
connection = self.acquire_connection()
@@ -1020,13 +1041,7 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10201041
# Read responses
10211042
ret = False
10221043
for cmd in commands:
1023-
try:
1024-
cmd.result = await self.parse_response(
1025-
connection, cmd.args[0], **cmd.kwargs
1026-
)
1027-
except Exception as e:
1028-
cmd.result = e
1029-
ret = True
1044+
ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
10301045

10311046
# Release connection
10321047
self._free.append(connection)

redis/asyncio/connection.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
)
2626
from urllib.parse import ParseResult, parse_qs, unquote, urlparse
2727

28-
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
28+
# the functionality is available in 3.11.x but has a major issue before
29+
# 3.11.3. See https://github.com/redis/redis-py/issues/2633
30+
if sys.version_info >= (3, 11, 3):
2931
from asyncio import timeout as async_timeout
3032
else:
3133
from async_timeout import timeout as async_timeout

redis/connection.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1167,8 +1167,9 @@ def _connect(self):
11671167
class UnixDomainSocketConnection(AbstractConnection):
11681168
"Manages UDS communication to and from a Redis server"
11691169

1170-
def __init__(self, path="", **kwargs):
1170+
def __init__(self, path="", socket_timeout=None, **kwargs):
11711171
self.path = path
1172+
self.socket_timeout = socket_timeout
11721173
super().__init__(**kwargs)
11731174

11741175
def repr_pieces(self):

setup.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
long_description_content_type="text/markdown",
99
keywords=["Redis", "key-value store", "database"],
1010
license="MIT",
11-
version="4.5.2",
11+
version="4.5.4",
1212
packages=find_packages(
1313
include=[
1414
"redis",
@@ -34,7 +34,7 @@
3434
install_requires=[
3535
'importlib-metadata >= 1.0; python_version < "3.8"',
3636
'typing-extensions; python_version<"3.8"',
37-
'async-timeout>=4.0.2; python_version<"3.11"',
37+
'async-timeout>=4.0.2; python_version<="3.11.2"',
3838
],
3939
classifiers=[
4040
"Development Status :: 5 - Production/Stable",

0 commit comments

Comments
 (0)