Skip to content

Commit

Permalink
lock client.check_version (#1771)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Apr 1, 2019
1 parent b1effa2 commit 3664ae8
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,13 +845,15 @@ def check_version(self, node_id=None, timeout=2, strict=False):
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
self._lock.acquire()
end = time.time() + timeout
while time.time() < end:

# It is possible that least_loaded_node falls back to bootstrap,
# which can block for an increasing backoff period
try_node = node_id or self.least_loaded_node()
if try_node is None:
self._lock.release()
raise Errors.NoBrokersAvailable()
self._maybe_connect(try_node)
conn = self._conns[try_node]
Expand All @@ -866,16 +868,19 @@ def check_version(self, node_id=None, timeout=2, strict=False):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
self._lock.release()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
if node_id is not None:
self._lock.release()
raise
finally:
self._refresh_on_disconnects = True

# Timeout
else:
self._lock.release()
raise Errors.NoBrokersAvailable()

def wakeup(self):
Expand Down

0 comments on commit 3664ae8

Please # to comment.