diff --git a/kafka/client_async.py b/kafka/client_async.py index b6adb775b..ba5c96034 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -845,6 +845,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): UnrecognizedBrokerVersion: please file bug if seen! AssertionError (if strict=True): please file bug if seen! """ + self._lock.acquire() end = time.time() + timeout while time.time() < end: @@ -852,6 +853,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() if try_node is None: + self._lock.release() raise Errors.NoBrokersAvailable() self._maybe_connect(try_node) conn = self._conns[try_node] @@ -866,16 +868,19 @@ def check_version(self, node_id=None, timeout=2, strict=False): # cache the api versions map if it's available (starting # in 0.10 cluster version) self._api_versions = conn.get_api_versions() + self._lock.release() return version except Errors.NodeNotReadyError: # Only raise to user if this is a node-specific request if node_id is not None: + self._lock.release() raise finally: self._refresh_on_disconnects = True # Timeout else: + self._lock.release() raise Errors.NoBrokersAvailable() def wakeup(self):