Skip to content

Commit bbf553c

Browse files
committed
Make sentinel thread-safe during master failover
In the current version, the sentinel code tries to close all connections immediately after discovering there is a new master. This is a problem in multi-threaded environment, because neither `ConnectionPool.disconnect` nor `Connection.disconnect` are thread-safe. If you call `SentinelConnectionPool.disconnect` after master failover, that will close all connections that are potentially used from other threads, causing all kinds of errors. This change avoids that behavior by adding acquire/release checks, so connections that don't belong the current master are never returned to the pool and they are closed instead.
1 parent 9858a09 commit bbf553c

File tree

1 file changed

+42
-21
lines changed

1 file changed

+42
-21
lines changed

redis/sentinel.py

+42-21
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class SlaveNotFoundError(ConnectionError):
2020
class SentinelManagedConnection(Connection):
2121
def __init__(self, **kwargs):
2222
self.connection_pool = kwargs.pop('connection_pool')
23+
self.to_be_disconnected = False
2324
super(SentinelManagedConnection, self).__init__(**kwargs)
2425

2526
def __repr__(self):
@@ -56,12 +57,10 @@ def read_response(self):
5657
return super(SentinelManagedConnection, self).read_response()
5758
except ReadOnlyError:
5859
if self.connection_pool.is_master:
59-
# When talking to a master, a ReadOnlyError when likely
60+
# When talking to a master, a ReadOnlyError likely
6061
# indicates that the previous master that we're still connected
6162
# to has been demoted to a slave and there's a new master.
62-
# calling disconnect will force the connection to re-query
63-
# sentinel during the next connect() attempt.
64-
self.disconnect()
63+
self.to_be_disconnected = True
6564
raise ConnectionError('The previous master is now a slave')
6665
raise
6766

@@ -97,18 +96,16 @@ def reset(self):
9796
self.slave_rr_counter = None
9897

9998
def get_master_address(self):
99+
"""Get the address of the current master"""
100100
master_address = self.sentinel_manager.discover_master(
101101
self.service_name)
102102
if self.is_master:
103-
if self.master_address is None:
103+
if master_address != self.master_address:
104104
self.master_address = master_address
105-
elif master_address != self.master_address:
106-
# Master address changed, disconnect all clients in this pool
107-
self.disconnect()
108105
return master_address
109106

110107
def rotate_slaves(self):
111-
"Round-robin slave balancer"
108+
"""Round-robin slave balancer"""
112109
slaves = self.sentinel_manager.discover_slaves(self.service_name)
113110
if slaves:
114111
if self.slave_rr_counter is None:
@@ -123,18 +120,42 @@ def rotate_slaves(self):
123120
yield self.get_master_address()
124121
except MasterNotFoundError:
125122
pass
126-
raise SlaveNotFoundError('No slave found for %r' % (self.service_name))
127-
128-
def _checkpid(self):
129-
if self.pid != os.getpid():
130-
self.disconnect()
131-
self.reset()
132-
self.__init__(self.service_name, self.sentinel_manager,
133-
is_master=self.is_master,
134-
check_connection=self.check_connection,
135-
connection_class=self.connection_class,
136-
max_connections=self.max_connections,
137-
**self.connection_kwargs)
123+
raise SlaveNotFoundError('No slave found for %r' % (self.service_name,))
124+
125+
def _check_connection(self, connection):
126+
if connection.to_be_disconnected:
127+
connection.disconnect()
128+
self.get_master_address()
129+
return False
130+
if self.is_master:
131+
if self.master_address != (connection.host, connection.port):
132+
connection.disconnect()
133+
return False
134+
return True
135+
136+
def get_connection(self, command_name, *keys, **options):
137+
"""Get a connection from the pool"""
138+
self._checkpid()
139+
while True:
140+
try:
141+
connection = self._available_connections.pop()
142+
except IndexError:
143+
connection = self.make_connection()
144+
else:
145+
if not self._check_connection(connection):
146+
continue
147+
self._in_use_connections.add(connection)
148+
return connection
149+
150+
def release(self, connection):
151+
"""Releases the connection back to the pool"""
152+
self._checkpid()
153+
if connection.pid != self.pid:
154+
return
155+
self._in_use_connections.remove(connection)
156+
if not self._check_connection(connection):
157+
return
158+
self._available_connections.append(connection)
138159

139160

140161
class Sentinel(object):

0 commit comments

Comments
 (0)