From f09b8e938edc951866520b50b62735021f689fc4 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Wed, 19 Feb 2020 10:25:09 +0800 Subject: [PATCH] cluster manager: use etcd3 instead of etcd2 (#463) --- cluster_manage/check_lib.py | 1 + cluster_manage/define.py | 12 + cluster_manage/etcd/__init__.py | 317 ------- cluster_manage/etcd/auth.py | 255 ------ cluster_manage/etcd/client.py | 993 ---------------------- cluster_manage/etcd/lock.py | 178 ---- cluster_manage/flash_cluster_manager.py | 70 +- cluster_manage/pd_client.py | 46 +- cluster_manage/placement_rule.py | 32 +- release-centos7/Dockerfile-builder | 1 + release-centos7/Dockerfile-builder-ci | 2 +- release-centos7/build/build-tiflash-ci.sh | 2 +- 12 files changed, 80 insertions(+), 1829 deletions(-) create mode 100644 cluster_manage/define.py delete mode 100644 cluster_manage/etcd/__init__.py delete mode 100644 cluster_manage/etcd/auth.py delete mode 100644 cluster_manage/etcd/client.py delete mode 100644 cluster_manage/etcd/lock.py diff --git a/cluster_manage/check_lib.py b/cluster_manage/check_lib.py index 3b90c363031..6424ca5d9d3 100644 --- a/cluster_manage/check_lib.py +++ b/cluster_manage/check_lib.py @@ -7,6 +7,7 @@ import toml import pybind11 import setuptools + import etcd3 except Exception as e: print(e) exit(-1) diff --git a/cluster_manage/define.py b/cluster_manage/define.py new file mode 100644 index 00000000000..1ff886c17eb --- /dev/null +++ b/cluster_manage/define.py @@ -0,0 +1,12 @@ +#!/usr/bin/python3 + +TIFLASH = 'tiflash' +TIFLASH_LABEL = {'key': 'engine', 'value': TIFLASH} +REGION_COUNT = 'region_count' +TIFLASH_REGION_COUNT = 'flash_region_count' +LOCATION_LABELS = 'location_labels' +REPLICA_COUNT = 'replica_count' +LEARNER = 'learner' +AVAILABLE = 'available' +TIFLASH_CLUSTER_MUTEX_KEY = '{}/cluster/leader'.format(TIFLASH) +LABEL_CONSTRAINTS = 'label_constraints' diff --git a/cluster_manage/etcd/__init__.py b/cluster_manage/etcd/__init__.py deleted file mode 100644 index 33b1d679491..00000000000 --- a/cluster_manage/etcd/__init__.py +++ /dev/null @@ -1,317 +0,0 @@ -import logging -from .client import Client -from .lock import Lock - -_log = logging.getLogger(__name__) - -# Prevent "no handler" warnings to stderr in projects that do not configure -# logging. -try: - from logging import NullHandler -except ImportError: - # Python <2.7, just define it. - class NullHandler(logging.Handler): - def emit(self, record): - pass -_log.addHandler(NullHandler()) - - -class EtcdResult(object): - _node_props = { - 'key': None, - 'value': None, - 'expiration': None, - 'ttl': None, - 'modifiedIndex': None, - 'createdIndex': None, - 'newKey': False, - 'dir': False, - } - - def __init__(self, action=None, node=None, prevNode=None, **kwdargs): - """ - Creates an EtcdResult object. - - Args: - action (str): The action that resulted in key creation - - node (dict): The dictionary containing all node information. - - prevNode (dict): The dictionary containing previous node information. - - """ - self.action = action - for (key, default) in self._node_props.items(): - if key in node: - setattr(self, key, node[key]) - else: - setattr(self, key, default) - - self._children = [] - if self.dir and 'nodes' in node: - # We keep the data in raw format, converting them only when needed - self._children = node['nodes'] - - if prevNode: - self._prev_node = EtcdResult(None, node=prevNode) - # See issue 38: when returning a write() op etcd has a bogus result. - if self._prev_node.dir and not self.dir: - self.dir = True - - def parse_headers(self, response): - headers = response.getheaders() - self.etcd_index = int(headers.get('x-etcd-index', 1)) - self.raft_index = int(headers.get('x-raft-index', 1)) - - def get_subtree(self, leaves_only=False): - """ - Get all the subtree resulting from a recursive=true call to etcd. - - Args: - leaves_only (bool): if true, only value nodes are returned - - - """ - if not self._children: - #if the current result is a leaf, return itself - yield self - return - else: - # node is not a leaf - if not leaves_only: - yield self - for n in self._children: - node = EtcdResult(None, n) - for child in node.get_subtree(leaves_only=leaves_only): - yield child - return - - @property - def leaves(self): - return self.get_subtree(leaves_only=True) - - @property - def children(self): - """ Deprecated, use EtcdResult.leaves instead """ - return self.leaves - - def __eq__(self, other): - if not (type(self) is type(other)): - return False - for k in self._node_props.keys(): - try: - a = getattr(self, k) - b = getattr(other, k) - if a != b: - return False - except: - return False - return True - - def __ne__(self, other): - return not self.__eq__(other) - - def __repr__(self): - return "%s(%r)" % (self.__class__, self.__dict__) - - -class EtcdException(Exception): - - """ - Generic Etcd Exception. - """ - def __init__(self, message=None, payload=None): - super(EtcdException, self).__init__(message) - self.payload = payload - - -class EtcdValueError(EtcdException, ValueError): - """ - Base class for Etcd value-related errors. - """ - pass - - -class EtcdCompareFailed(EtcdValueError): - """ - Compare-and-swap failure - """ - pass - - -class EtcdClusterIdChanged(EtcdException): - """ - The etcd cluster ID changed. This may indicate the cluster was replaced - with a backup. Raised to prevent waiting on an etcd_index that was only - valid on the old cluster. - """ - pass - - -class EtcdKeyError(EtcdException): - """ - Etcd Generic KeyError Exception - """ - pass - - -class EtcdKeyNotFound(EtcdKeyError): - """ - Etcd key not found exception (100) - """ - pass - - -class EtcdNotFile(EtcdKeyError): - """ - Etcd not a file exception (102) - """ - pass - - -class EtcdNotDir(EtcdKeyError): - """ - Etcd not a directory exception (104) - """ - pass - - -class EtcdAlreadyExist(EtcdKeyError): - """ - Etcd already exist exception (105) - """ - pass - - -class EtcdEventIndexCleared(EtcdException): - """ - Etcd event index is outdated and cleared exception (401) - """ - pass - - -class EtcdConnectionFailed(EtcdException): - """ - Connection to etcd failed. - """ - def __init__(self, message=None, payload=None, cause=None): - super(EtcdConnectionFailed, self).__init__(message=message, - payload=payload) - self.cause = cause - - -class EtcdInsufficientPermissions(EtcdException): - """ - Request failed because of insufficient permissions. - """ - pass - - -class EtcdWatchTimedOut(EtcdConnectionFailed): - """ - A watch timed out without returning a result. - """ - pass - - -class EtcdWatcherCleared(EtcdException): - """ - Watcher is cleared due to etcd recovery. - """ - pass - - -class EtcdLeaderElectionInProgress(EtcdException): - """ - Request failed due to in-progress leader election. - """ - pass - - -class EtcdRootReadOnly(EtcdKeyError): - """ - Operation is not valid on the root, which is read only. - """ - pass - - -class EtcdDirNotEmpty(EtcdValueError): - """ - Directory not empty. - """ - pass - - -class EtcdLockExpired(EtcdException): - """ - Our lock apparently expired while we were trying to acquire it. - """ - pass - - -class EtcdError(object): - # See https://github.com/coreos/etcd/blob/master/Documentation/v2/errorcode.md - error_exceptions = { - 100: EtcdKeyNotFound, - 101: EtcdCompareFailed, - 102: EtcdNotFile, - # 103: Non-public: no more peers. - 104: EtcdNotDir, - 105: EtcdAlreadyExist, - # 106: Non-public: key is preserved. - 107: EtcdRootReadOnly, - 108: EtcdDirNotEmpty, - # 109: Non-public: existing peer addr. - 110: EtcdInsufficientPermissions, - - 200: EtcdValueError, # Not part of v2 - 201: EtcdValueError, - 202: EtcdValueError, - 203: EtcdValueError, - 204: EtcdValueError, - 205: EtcdValueError, - 206: EtcdValueError, - 207: EtcdValueError, - 208: EtcdValueError, - 209: EtcdValueError, - 210: EtcdValueError, - - # 300: Non-public: Raft internal error. - 301: EtcdLeaderElectionInProgress, - - 400: EtcdWatcherCleared, - 401: EtcdEventIndexCleared, - } - - @classmethod - def handle(cls, payload): - """ - Decodes the error and throws the appropriate error message - - :param payload: The decoded JSON error payload as a dict. - """ - error_code = payload.get("errorCode") - message = payload.get("message") - cause = payload.get("cause") - msg = '{} : {}'.format(message, cause) - status = payload.get("status") - # Some general status handling, as - # not all endpoints return coherent error messages - if status == 404: - error_code = 100 - elif status == 401: - error_code = 110 - exc = cls.error_exceptions.get(error_code, EtcdException) - if issubclass(exc, EtcdException): - raise exc(msg, payload) - else: - raise exc(msg) - - -# Attempt to enable urllib3's SNI support, if possible -# Blatantly copied from requests. -try: - from urllib3.contrib import pyopenssl - pyopenssl.inject_into_urllib3() -except ImportError: - pass diff --git a/cluster_manage/etcd/auth.py b/cluster_manage/etcd/auth.py deleted file mode 100644 index 796772d73fd..00000000000 --- a/cluster_manage/etcd/auth.py +++ /dev/null @@ -1,255 +0,0 @@ -import json - -import logging -import etcd - -_log = logging.getLogger(__name__) - - -class EtcdAuthBase(object): - entity = 'example' - - def __init__(self, client, name): - self.client = client - self.name = name - self.uri = "{}/auth/{}s/{}".format(self.client.version_prefix, - self.entity, self.name) - - @property - def names(self): - key = "{}s".format(self.entity) - uri = "{}/auth/{}".format(self.client.version_prefix, key) - response = self.client.api_execute(uri, self.client._MGET) - return json.loads(response.data.decode('utf-8'))[key] - - def read(self): - try: - response = self.client.api_execute(self.uri, self.client._MGET) - except etcd.EtcdInsufficientPermissions as e: - _log.error("Any action on the authorization requires the root role") - raise - except etcd.EtcdKeyNotFound: - _log.info("%s '%s' not found", self.entity, self.name) - raise - except Exception as e: - _log.error("Failed to fetch %s in %s%s: %r", - self.entity, self.client._base_uri, - self.client.version_prefix, e) - raise etcd.EtcdException( - "Could not fetch {} '{}'".format(self.entity, self.name)) - - self._from_net(response.data) - - def write(self): - try: - r = self.__class__(self.client, self.name) - r.read() - except etcd.EtcdKeyNotFound: - r = None - try: - for payload in self._to_net(r): - response = self.client.api_execute_json(self.uri, - self.client._MPUT, - params=payload) - # This will fail if the response is an error - self._from_net(response.data) - except etcd.EtcdInsufficientPermissions as e: - _log.error("Any action on the authorization requires the root role") - raise - except Exception as e: - _log.error("Failed to write %s '%s'", self.entity, self.name) - # TODO: fine-grained exception handling - raise etcd.EtcdException( - "Could not write {} '{}': {}".format(self.entity, - self.name, e)) - - def delete(self): - try: - _ = self.client.api_execute(self.uri, self.client._MDELETE) - except etcd.EtcdInsufficientPermissions as e: - _log.error("Any action on the authorization requires the root role") - raise - except etcd.EtcdKeyNotFound: - _log.info("%s '%s' not found", self.entity, self.name) - raise - except Exception as e: - _log.error("Failed to delete %s in %s%s: %r", - self.entity, self._base_uri, self.version_prefix, e) - raise etcd.EtcdException( - "Could not delete {} '{}'".format(self.entity, self.name)) - - def _from_net(self, data): - raise NotImplementedError() - - def _to_net(self, old=None): - raise NotImplementedError() - - @classmethod - def new(cls, client, data): - c = cls(client, data[cls.entity]) - c._from_net(data) - return c - - -class EtcdUser(EtcdAuthBase): - """Class to manage in a orm-like way etcd users""" - entity = 'user' - - def __init__(self, client, name): - super(EtcdUser, self).__init__(client, name) - self._roles = set() - self._password = None - - def _from_net(self, data): - d = json.loads(data.decode('utf-8')) - self.roles = d.get('roles', []) - self.name = d.get('user') - - def _to_net(self, prevobj=None): - if prevobj is None: - retval = [{"user": self.name, "password": self._password, - "roles": list(self.roles)}] - else: - retval = [] - if self._password: - retval.append({"user": self.name, "password": self._password}) - to_grant = list(self.roles - prevobj.roles) - to_revoke = list(prevobj.roles - self.roles) - if to_grant: - retval.append({"user": self.name, "grant": to_grant}) - if to_revoke: - retval.append({"user": self.name, "revoke": to_revoke}) - # Let's blank the password now - # Even if the user can't be written we don't want it to leak anymore. - self._password = None - return retval - - @property - def roles(self): - return self._roles - - @roles.setter - def roles(self, val): - self._roles = set(val) - - @property - def password(self): - """Empty property for password.""" - return None - - @password.setter - def password(self, new_password): - """Change user's password.""" - self._password = new_password - - def __str__(self): - return json.dumps(self._to_net()[0]) - - - -class EtcdRole(EtcdAuthBase): - entity = 'role' - - def __init__(self, client, name): - super(EtcdRole, self).__init__(client, name) - self._read_paths = set() - self._write_paths = set() - - def _from_net(self, data): - d = json.loads(data.decode('utf-8')) - self.name = d.get('role') - - try: - kv = d["permissions"]["kv"] - except: - self._read_paths = set() - self._write_paths = set() - return - - self._read_paths = set(kv.get('read', [])) - self._write_paths = set(kv.get('write', [])) - - def _to_net(self, prevobj=None): - retval = [] - if prevobj is None: - retval.append({ - "role": self.name, - "permissions": - { - "kv": - { - "read": list(self._read_paths), - "write": list(self._write_paths) - } - } - }) - else: - to_grant = { - 'read': list(self._read_paths - prevobj._read_paths), - 'write': list(self._write_paths - prevobj._write_paths) - } - to_revoke = { - 'read': list(prevobj._read_paths - self._read_paths), - 'write': list(prevobj._write_paths - self._write_paths) - } - if [path for sublist in to_revoke.values() for path in sublist]: - retval.append({'role': self.name, 'revoke': {'kv': to_revoke}}) - if [path for sublist in to_grant.values() for path in sublist]: - retval.append({'role': self.name, 'grant': {'kv': to_grant}}) - return retval - - def grant(self, path, permission): - if permission.upper().find('R') >= 0: - self._read_paths.add(path) - if permission.upper().find('W') >= 0: - self._write_paths.add(path) - - def revoke(self, path, permission): - if permission.upper().find('R') >= 0 and \ - path in self._read_paths: - self._read_paths.remove(path) - if permission.upper().find('W') >= 0 and \ - path in self._write_paths: - self._write_paths.remove(path) - - @property - def acls(self): - perms = {} - try: - for path in self._read_paths: - perms[path] = 'R' - for path in self._write_paths: - if path in perms: - perms[path] += 'W' - else: - perms[path] = 'W' - except: - pass - return perms - - @acls.setter - def acls(self, acls): - self._read_paths = set() - self._write_paths = set() - for path, permission in acls.items(): - self.grant(path, permission) - - def __str__(self): - return json.dumps({"role": self.name, 'acls': self.acls}) - - -class Auth(object): - def __init__(self, client): - self.client = client - self.uri = "{}/auth/enable".format(self.client.version_prefix) - - @property - def active(self): - resp = self.client.api_execute(self.uri, self.client._MGET) - return json.loads(resp.data.decode('utf-8'))['enabled'] - - @active.setter - def active(self, value): - if value != self.active: - method = value and self.client._MPUT or self.client._MDELETE - self.client.api_execute(self.uri, method) diff --git a/cluster_manage/etcd/client.py b/cluster_manage/etcd/client.py deleted file mode 100644 index 74ad8fc1b5b..00000000000 --- a/cluster_manage/etcd/client.py +++ /dev/null @@ -1,993 +0,0 @@ -""" -.. module:: python-etcd - :synopsis: A python etcd client. - -.. moduleauthor:: Jose Plana - - -""" -import logging -try: - # Python 3 - from http.client import HTTPException -except ImportError: - # Python 2 - from httplib import HTTPException -import socket -import urllib3 -from urllib3.exceptions import HTTPError -from urllib3.exceptions import ReadTimeoutError -import json -import ssl -import dns.resolver -from functools import wraps -import etcd - -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse - - -_log = logging.getLogger(__name__) - - -class Client(object): - - """ - Client for etcd, the distributed log service using raft. - """ - - _MGET = 'GET' - _MPUT = 'PUT' - _MPOST = 'POST' - _MDELETE = 'DELETE' - _comparison_conditions = set(('prevValue', 'prevIndex', 'prevExist', 'refresh')) - _read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'quorum')) - _del_conditions = set(('prevValue', 'prevIndex')) - - http = None - - def __init__( - self, - host='127.0.0.1', - port=4001, - srv_domain=None, - version_prefix='/v2', - read_timeout=60, - allow_redirect=True, - protocol='http', - cert=None, - ca_cert=None, - username=None, - password=None, - allow_reconnect=False, - use_proxies=False, - expected_cluster_id=None, - per_host_pool_size=10, - lock_prefix="/_locks" - ): - """ - Initialize the client. - - Args: - host (mixed): - If a string, IP to connect to. - If a tuple ((host, port), (host, port), ...) - - port (int): Port used to connect to etcd. - - srv_domain (str): Domain to search the SRV record for cluster autodiscovery. - - version_prefix (str): Url or version prefix in etcd url (default=/v2). - - read_timeout (int): max seconds to wait for a read. - - allow_redirect (bool): allow the client to connect to other nodes. - - protocol (str): Protocol used to connect to etcd. - - cert (mixed): If a string, the whole ssl client certificate; - if a tuple, the cert and key file names. - - ca_cert (str): The ca certificate. If pressent it will enable - validation. - - username (str): username for etcd authentication. - - password (str): password for etcd authentication. - - allow_reconnect (bool): allow the client to reconnect to another - etcd server in the cluster in the case the - default one does not respond. - - use_proxies (bool): we are using a list of proxies to which we connect, - and don't want to connect to the original etcd cluster. - - expected_cluster_id (str): If a string, recorded as the expected - UUID of the cluster (rather than - learning it from the first request), - reads will raise EtcdClusterIdChanged - if they receive a response with a - different cluster ID. - per_host_pool_size (int): specifies maximum number of connections to pool - by host. By default this will use up to 10 - connections. - lock_prefix (str): Set the key prefix at etcd when client to lock object. - By default this will be use /_locks. - """ - - # If a DNS record is provided, use it to get the hosts list - if srv_domain is not None: - try: - host = self._discover(srv_domain) - except Exception as e: - _log.error("Could not discover the etcd hosts from %s: %s", - srv_domain, e) - - self._protocol = protocol - - def uri(protocol, host, port): - return '%s://%s:%d' % (protocol, host, port) - - if not isinstance(host, tuple): - self._machines_cache = [] - self._base_uri = uri(self._protocol, host, port) - else: - if not allow_reconnect: - _log.error("List of hosts incompatible with allow_reconnect.") - raise etcd.EtcdException("A list of hosts to connect to was given, but reconnection not allowed?") - self._machines_cache = [uri(self._protocol, *conn) for conn in host] - self._base_uri = self._machines_cache.pop(0) - - self.expected_cluster_id = expected_cluster_id - self.version_prefix = version_prefix - - self._read_timeout = read_timeout - self._allow_redirect = allow_redirect - self._use_proxies = use_proxies - self._allow_reconnect = allow_reconnect - self._lock_prefix = lock_prefix - - # SSL Client certificate support - - kw = { - 'maxsize': per_host_pool_size - } - - if self._read_timeout > 0: - kw['timeout'] = self._read_timeout - - if cert: - if isinstance(cert, tuple): - # Key and cert are separate - kw['cert_file'] = cert[0] - kw['key_file'] = cert[1] - else: - # combined certificate - kw['cert_file'] = cert - - if ca_cert: - kw['ca_certs'] = ca_cert - kw['cert_reqs'] = ssl.CERT_REQUIRED - - self.username = None - self.password = None - if username and password: - self.username = username - self.password = password - elif username: - _log.warning('Username provided without password, both are required for authentication') - elif password: - _log.warning('Password provided without username, both are required for authentication') - - self.http = urllib3.PoolManager(num_pools=10, **kw) - - _log.debug("New etcd client created for %s", self.base_uri) - - if self._allow_reconnect: - # we need the set of servers in the cluster in order to try - # reconnecting upon error. The cluster members will be - # added to the hosts list you provided. If you are using - # proxies, set all - # - # Beware though: if you input '127.0.0.1' as your host and - # etcd advertises 'localhost', both will be in the - # resulting list. - - # If we're connecting to the original cluster, we can - # extend the list given to the client with what we get - # from self.machines - if not self._use_proxies: - self._machines_cache = list(set(self._machines_cache) | - set(self.machines)) - if self._base_uri in self._machines_cache: - self._machines_cache.remove(self._base_uri) - _log.debug("Machines cache initialised to %s", - self._machines_cache) - - # Versions set to None. They will be set upon first usage. - self._version = self._cluster_version = None - - def _set_version_info(self): - """ - Sets the version information provided by the server. - """ - # Set the version - version_info = json.loads(self.http.request( - self._MGET, - self._base_uri + '/version', - headers=self._get_headers(), - timeout=self.read_timeout, - redirect=self.allow_redirect).data.decode('utf-8')) - self._version = version_info['etcdserver'] - self._cluster_version = version_info['etcdcluster'] - - def _discover(self, domain): - srv_name = "_etcd._tcp.{}".format(domain) - answers = dns.resolver.query(srv_name, 'SRV') - hosts = [] - for answer in answers: - hosts.append( - (answer.target.to_text(omit_final_dot=True), answer.port)) - _log.debug("Found %s", hosts) - if not len(hosts): - raise ValueError("The SRV record is present but no host were found") - return tuple(hosts) - - def __del__(self): - """Clean up open connections""" - if self.http is not None: - try: - self.http.clear() - except ReferenceError: - # this may hit an already-cleared weakref - pass - - @property - def base_uri(self): - """URI used by the client to connect to etcd.""" - return self._base_uri - - @property - def host(self): - """Node to connect etcd.""" - return urlparse(self._base_uri).netloc.split(':')[0] - - @property - def port(self): - """Port to connect etcd.""" - return int(urlparse(self._base_uri).netloc.split(':')[1]) - - @property - def protocol(self): - """Protocol used to connect etcd.""" - return self._protocol - - @property - def read_timeout(self): - """Max seconds to wait for a read.""" - return self._read_timeout - - @property - def allow_redirect(self): - """Allow the client to connect to other nodes.""" - return self._allow_redirect - - @property - def lock_prefix(self): - """Get the key prefix at etcd when client to lock object.""" - return self._lock_prefix - - @property - def machines(self): - """ - Members of the cluster. - - Returns: - list. str with all the nodes in the cluster. - - >>> print client.machines - ['http://127.0.0.1:4001', 'http://127.0.0.1:4002'] - """ - # We can't use api_execute here, or it causes a logical loop - try: - uri = self._base_uri + self.version_prefix + '/machines' - response = self.http.request( - self._MGET, - uri, - headers=self._get_headers(), - timeout=self.read_timeout, - redirect=self.allow_redirect) - - machines = [ - node.strip() for node in - self._handle_server_response(response).data.decode('utf-8').split(',') - ] - _log.debug("Retrieved list of machines: %s", machines) - return machines - except (HTTPError, HTTPException, socket.error) as e: - # We can't get the list of machines, if one server is in the - # machines cache, try on it - _log.error("Failed to get list of machines from %s%s: %r", - self._base_uri, self.version_prefix, e) - if self._machines_cache: - self._base_uri = self._machines_cache.pop(0) - _log.info("Retrying on %s", self._base_uri) - # Call myself - return self.machines - else: - raise etcd.EtcdException("Could not get the list of servers, " - "maybe you provided the wrong " - "host(s) to connect to?") - - @property - def members(self): - """ - A more structured view of peers in the cluster. - - Note that while we have an internal DS called _members, accessing the public property will call etcd. - """ - # Empty the members list - self._members = {} - try: - data = self.api_execute(self.version_prefix + '/members', - self._MGET).data.decode('utf-8') - res = json.loads(data) - for member in res['members']: - self._members[member['id']] = member - return self._members - except: - raise etcd.EtcdException("Could not get the members list, maybe the cluster has gone away?") - - @property - def leader(self): - """ - Returns: - dict. the leader of the cluster. - - >>> print client.leader - {"id":"ce2a822cea30bfca","name":"default","peerURLs":["http://localhost:2380","http://localhost:7001"],"clientURLs":["http://127.0.0.1:4001"]} - """ - try: - - leader = json.loads( - self.api_execute(self.version_prefix + '/stats/self', - self._MGET).data.decode('utf-8')) - return self.members[leader['leaderInfo']['leader']] - except Exception as e: - raise etcd.EtcdException("Cannot get leader data: %s" % e) - - @property - def stats(self): - """ - Returns: - dict. the stats of the local server - """ - return self._stats() - - @property - def leader_stats(self): - """ - Returns: - dict. the stats of the leader - """ - return self._stats('leader') - - @property - def store_stats(self): - """ - Returns: - dict. the stats of the kv store - """ - return self._stats('store') - - def _stats(self, what='self'): - """ Internal method to access the stats endpoints""" - data = self.api_execute(self.version_prefix - + '/stats/' + what, self._MGET).data.decode('utf-8') - try: - return json.loads(data) - except (TypeError,ValueError): - raise etcd.EtcdException("Cannot parse json data in the response") - - @property - def version(self): - """ - Version of etcd. - """ - if not self._version: - self._set_version_info() - return self._version - - @property - def cluster_version(self): - """ - Version of the etcd cluster. - """ - if not self._cluster_version: - self._set_version_info() - - return self._cluster_version - - @property - def key_endpoint(self): - """ - REST key endpoint. - """ - return self.version_prefix + '/keys' - - def __contains__(self, key): - """ - Check if a key is available in the cluster. - - >>> print 'key' in client - True - """ - try: - self.get(key) - return True - except etcd.EtcdKeyNotFound: - return False - - def _sanitize_key(self, key): - if not key.startswith('/'): - key = "/{}".format(key) - return key - - def write(self, key, value, ttl=None, dir=False, append=False, **kwdargs): - """ - Writes the value for a key, possibly doing atomic Compare-and-Swap - - Args: - key (str): Key. - - value (object): value to set - - ttl (int): Time in seconds of expiration (optional). - - dir (bool): Set to true if we are writing a directory; default is false. - - append (bool): If true, it will post to append the new value to the dir, creating a sequential key. Defaults to false. - - Other parameters modifying the write method are accepted: - - - prevValue (str): compare key to this value, and swap only if corresponding (optional). - - prevIndex (int): modify key only if actual modifiedIndex matches the provided one (optional). - - prevExist (bool): If false, only create key; if true, only update key. - - refresh (bool): since 2.3.0, If true, only update the ttl, prev key must existed(prevExist=True). - - Returns: - client.EtcdResult - - >>> print client.write('/key', 'newValue', ttl=60, prevExist=False).value - 'newValue' - - """ - _log.debug("Writing %s to key %s ttl=%s dir=%s append=%s", - value, key, ttl, dir, append) - key = self._sanitize_key(key) - params = {} - if value is not None: - params['value'] = value - - if ttl is not None: - params['ttl'] = ttl - - if dir: - if value: - raise etcd.EtcdException( - 'Cannot create a directory with a value') - params['dir'] = "true" - - for (k, v) in kwdargs.items(): - if k in self._comparison_conditions: - if type(v) == bool: - params[k] = v and "true" or "false" - else: - params[k] = v - - method = append and self._MPOST or self._MPUT - if '_endpoint' in kwdargs: - path = kwdargs['_endpoint'] + key - else: - path = self.key_endpoint + key - - response = self.api_execute(path, method, params=params) - return self._result_from_response(response) - - def refresh(self, key, ttl, **kwdargs): - """ - (Since 2.3.0) Refresh the ttl of a key without notifying watchers. - - Keys in etcd can be refreshed without notifying watchers, - this can be achieved by setting the refresh to true when updating a TTL - - You cannot update the value of a key when refreshing it - - @see: https://github.com/coreos/etcd/blob/release-2.3/Documentation/api.md#refreshing-key-ttl - - Args: - key (str): Key. - - ttl (int): Time in seconds of expiration (optional). - - Other parameters modifying the write method are accepted as `EtcdClient.write`. - """ - # overwrite kwdargs' prevExist - kwdargs['prevExist'] = True - return self.write(key=key, value=None, ttl=ttl, refresh=True, **kwdargs) - - def update(self, obj): - """ - Updates the value for a key atomically. Typical usage would be: - - c = etcd.Client() - o = c.read("/somekey") - o.value += 1 - c.update(o) - - Args: - obj (etcd.EtcdResult): The object that needs updating. - - """ - _log.debug("Updating %s to %s.", obj.key, obj.value) - kwdargs = { - 'dir': obj.dir, - 'ttl': obj.ttl, - 'prevExist': True - } - - if not obj.dir: - # prevIndex on a dir causes a 'not a file' error. d'oh! - kwdargs['prevIndex'] = obj.modifiedIndex - return self.write(obj.key, obj.value, **kwdargs) - - def read(self, key, **kwdargs): - """ - Returns the value of the key 'key'. - - Args: - key (str): Key. - - Recognized kwd args - - recursive (bool): If you should fetch recursively a dir - - wait (bool): If we should wait and return next time the key is changed - - waitIndex (int): The index to fetch results from. - - sorted (bool): Sort the output keys (alphanumerically) - - timeout (int): max seconds to wait for a read. - - Returns: - client.EtcdResult (or an array of client.EtcdResult if a - subtree is queried) - - Raises: - KeyValue: If the key doesn't exists. - - urllib3.exceptions.TimeoutError: If timeout is reached. - - >>> print client.get('/key').value - 'value' - - """ - _log.debug("Issuing read for key %s with args %s", key, kwdargs) - key = self._sanitize_key(key) - - params = {} - for (k, v) in kwdargs.items(): - if k in self._read_options: - if type(v) == bool: - params[k] = v and "true" or "false" - elif v is not None: - params[k] = v - - timeout = kwdargs.get('timeout', None) - - response = self.api_execute( - self.key_endpoint + key, self._MGET, params=params, - timeout=timeout) - return self._result_from_response(response) - - def delete(self, key, recursive=None, dir=None, **kwdargs): - """ - Removed a key from etcd. - - Args: - - key (str): Key. - - recursive (bool): if we want to recursively delete a directory, set - it to true - - dir (bool): if we want to delete a directory, set it to true - - prevValue (str): compare key to this value, and swap only if - corresponding (optional). - - prevIndex (int): modify key only if actual modifiedIndex matches the - provided one (optional). - - Returns: - client.EtcdResult - - Raises: - KeyValue: If the key doesn't exists. - - >>> print client.delete('/key').key - '/key' - - """ - _log.debug("Deleting %s recursive=%s dir=%s extra args=%s", - key, recursive, dir, kwdargs) - key = self._sanitize_key(key) - - kwds = {} - if recursive is not None: - kwds['recursive'] = recursive and "true" or "false" - if dir is not None: - kwds['dir'] = dir and "true" or "false" - - for k in self._del_conditions: - if k in kwdargs: - kwds[k] = kwdargs[k] - _log.debug("Calculated params = %s", kwds) - - response = self.api_execute( - self.key_endpoint + key, self._MDELETE, params=kwds) - return self._result_from_response(response) - - def pop(self, key, recursive=None, dir=None, **kwdargs): - """ - Remove specified key from etcd and return the corresponding value. - - Args: - - key (str): Key. - - recursive (bool): if we want to recursively delete a directory, set - it to true - - dir (bool): if we want to delete a directory, set it to true - - prevValue (str): compare key to this value, and swap only if - corresponding (optional). - - prevIndex (int): modify key only if actual modifiedIndex matches the - provided one (optional). - - Returns: - client.EtcdResult - - Raises: - KeyValue: If the key doesn't exists. - - >>> print client.pop('/key').value - 'value' - - """ - return self.delete(key=key, recursive=recursive, dir=dir, **kwdargs)._prev_node - - # Higher-level methods on top of the basic primitives - def test_and_set(self, key, value, prev_value, ttl=None): - """ - Atomic test & set operation. - It will check if the value of 'key' is 'prev_value', - if the the check is correct will change the value for 'key' to 'value' - if the the check is false an exception will be raised. - - Args: - key (str): Key. - value (object): value to set - prev_value (object): previous value. - ttl (int): Time in seconds of expiration (optional). - - Returns: - client.EtcdResult - - Raises: - ValueError: When the 'prev_value' is not the current value. - - >>> print client.test_and_set('/key', 'new', 'old', ttl=60).value - 'new' - - """ - return self.write(key, value, prevValue=prev_value, ttl=ttl) - - def set(self, key, value, ttl=None): - """ - Compatibility: sets the value of the key 'key' to the value 'value' - - Args: - key (str): Key. - value (object): value to set - ttl (int): Time in seconds of expiration (optional). - - Returns: - client.EtcdResult - - Raises: - etcd.EtcdException: when something weird goes wrong. - - """ - return self.write(key, value, ttl=ttl) - - def get(self, key): - """ - Returns the value of the key 'key'. - - Args: - key (str): Key. - - Returns: - client.EtcdResult - - Raises: - KeyError: If the key doesn't exists. - - >>> print client.get('/key').value - 'value' - - """ - return self.read(key) - - def watch(self, key, index=None, timeout=None, recursive=None): - """ - Blocks until a new event has been received, starting at index 'index' - - Args: - key (str): Key. - - index (int): Index to start from. - - timeout (int): max seconds to wait for a read. - - Returns: - client.EtcdResult - - Raises: - KeyValue: If the key doesn't exist. - - etcd.EtcdWatchTimedOut: If timeout is reached. - - >>> print client.watch('/key').value - 'value' - - """ - _log.debug("About to wait on key %s, index %s", key, index) - if index: - return self.read(key, wait=True, waitIndex=index, timeout=timeout, - recursive=recursive) - else: - return self.read(key, wait=True, timeout=timeout, - recursive=recursive) - - def eternal_watch(self, key, index=None, recursive=None): - """ - Generator that will yield changes from a key. - Note that this method will block forever until an event is generated. - - Args: - key (str): Key to subcribe to. - index (int): Index from where the changes will be received. - - Yields: - client.EtcdResult - - >>> for event in client.eternal_watch('/subcription_key'): - ... print event.value - ... - value1 - value2 - - """ - local_index = index - while True: - response = self.watch(key, index=local_index, timeout=0, recursive=recursive) - local_index = response.modifiedIndex + 1 - yield response - - def get_lock(self, *args, **kwargs): - raise NotImplementedError('Lock primitives were removed from etcd 2.0') - - @property - def election(self): - raise NotImplementedError('Election primitives were removed from etcd 2.0') - - def _result_from_response(self, response): - """ Creates an EtcdResult from json dictionary """ - raw_response = response.data - try: - res = json.loads(raw_response.decode('utf-8')) - except (TypeError, ValueError, UnicodeError) as e: - raise etcd.EtcdException( - 'Server response was not valid JSON: %r' % e) - try: - r = etcd.EtcdResult(**res) - if response.status == 201: - r.newKey = True - r.parse_headers(response) - return r - except Exception as e: - raise etcd.EtcdException( - 'Unable to decode server response: %r' % e) - - def _next_server(self, cause=None): - """ Selects the next server in the list, refreshes the server list. """ - _log.debug("Selection next machine in cache. Available machines: %s", - self._machines_cache) - try: - mach = self._machines_cache.pop() - except IndexError: - _log.error("Machines cache is empty, no machines to try.") - raise etcd.EtcdConnectionFailed('No more machines in the cluster', - cause=cause) - else: - _log.info("Selected new etcd server %s", mach) - return mach - - def _wrap_request(payload): - @wraps(payload) - def wrapper(self, path, method, params=None, timeout=None): - response = False - - if timeout is None: - timeout = self.read_timeout - - if timeout == 0: - timeout = None - - if not path.startswith('/'): - raise ValueError('Path does not start with /') - - while not response: - some_request_failed = False - try: - response = payload(self, path, method, - params=params, timeout=timeout) - # Check the cluster ID hasn't changed under us. We use - # preload_content=False above so we can read the headers - # before we wait for the content of a watch. - self._check_cluster_id(response) - # Now force the data to be preloaded in order to trigger any - # IO-related errors in this method rather than when we try to - # access it later. - _ = response.data - # urllib3 doesn't wrap all httplib exceptions and earlier versions - # don't wrap socket errors either. - except (HTTPError, HTTPException, socket.error) as e: - if (isinstance(params, dict) and - params.get("wait") == "true" and - isinstance(e, ReadTimeoutError)): - _log.debug("Watch timed out.") - raise etcd.EtcdWatchTimedOut( - "Watch timed out: %r" % e, - cause=e - ) - _log.error("Request to server %s failed: %r", - self._base_uri, e) - if self._allow_reconnect: - _log.info("Reconnection allowed, looking for another " - "server.") - # _next_server() raises EtcdException if there are no - # machines left to try, breaking out of the loop. - self._base_uri = self._next_server(cause=e) - some_request_failed = True - - # if exception is raised on _ = response.data - # the condition for while loop will be False - # but we should retry - response = False - else: - _log.debug("Reconnection disabled, giving up.") - raise etcd.EtcdConnectionFailed( - "Connection to etcd failed due to %r" % e, - cause=e - ) - except etcd.EtcdClusterIdChanged as e: - _log.warning(e) - raise - except: - _log.exception("Unexpected request failure, re-raising.") - raise - - if some_request_failed: - if not self._use_proxies: - # The cluster may have changed since last invocation - self._machines_cache = self.machines - self._machines_cache.remove(self._base_uri) - return self._handle_server_response(response) - return wrapper - - @_wrap_request - def api_execute(self, path, method, params=None, timeout=None): - """ Executes the query. """ - url = self._base_uri + path - - if (method == self._MGET) or (method == self._MDELETE): - return self.http.request( - method, - url, - timeout=timeout, - fields=params, - redirect=self.allow_redirect, - headers=self._get_headers(), - preload_content=False) - - elif (method == self._MPUT) or (method == self._MPOST): - return self.http.request_encode_body( - method, - url, - fields=params, - timeout=timeout, - encode_multipart=False, - redirect=self.allow_redirect, - headers=self._get_headers(), - preload_content=False) - else: - raise etcd.EtcdException( - 'HTTP method {} not supported'.format(method)) - - @_wrap_request - def api_execute_json(self, path, method, params=None, timeout=None): - url = self._base_uri + path - json_payload = json.dumps(params) - headers = self._get_headers() - headers['Content-Type'] = 'application/json' - return self.http.urlopen(method, - url, - body=json_payload, - timeout=timeout, - redirect=self.allow_redirect, - headers=headers, - preload_content=False) - - def _check_cluster_id(self, response): - cluster_id = response.getheader("x-etcd-cluster-id") - if not cluster_id: - _log.warning("etcd response did not contain a cluster ID") - return - id_changed = (self.expected_cluster_id and - cluster_id != self.expected_cluster_id) - # Update the ID so we only raise the exception once. - old_expected_cluster_id = self.expected_cluster_id - self.expected_cluster_id = cluster_id - if id_changed: - # Defensive: clear the pool so that we connect afresh next - # time. - self.http.clear() - raise etcd.EtcdClusterIdChanged( - 'The UUID of the cluster changed from {} to ' - '{}.'.format(old_expected_cluster_id, cluster_id)) - - def _handle_server_response(self, response): - """ Handles the server response """ - if response.status in [200, 201]: - return response - - else: - resp = response.data.decode('utf-8') - - # throw the appropriate exception - try: - r = json.loads(resp) - r['status'] = response.status - except (TypeError, ValueError): - # Bad JSON, make a response locally. - r = {"message": "Bad response", - "cause": str(resp)} - etcd.EtcdError.handle(r) - - def _get_headers(self): - if self.username and self.password: - credentials = ':'.join((self.username, self.password)) - return urllib3.make_headers(basic_auth=credentials) - return {} diff --git a/cluster_manage/etcd/lock.py b/cluster_manage/etcd/lock.py deleted file mode 100644 index a77aaa766a1..00000000000 --- a/cluster_manage/etcd/lock.py +++ /dev/null @@ -1,178 +0,0 @@ -import logging -import etcd -import uuid - -_log = logging.getLogger(__name__) - -class Lock(object): - """ - Locking recipe for etcd, inspired by the kazoo recipe for zookeeper - """ - - def __init__(self, client, lock_name): - self.client = client - self.name = lock_name - # props to Netflix Curator for this trick. It is possible for our - # create request to succeed on the server, but for a failure to - # prevent us from getting back the full path name. We prefix our - # lock name with a uuid and can check for its presence on retry. - self._uuid = uuid.uuid4().hex - self.path = "{}/{}".format(client.lock_prefix, lock_name) - self.is_taken = False - self._sequence = None - _log.debug("Initiating lock for %s with uuid %s", self.path, self._uuid) - - @property - def uuid(self): - """ - The unique id of the lock - """ - return self._uuid - - @uuid.setter - def uuid(self, value): - old_uuid = self._uuid - self._uuid = value - if not self._find_lock(): - _log.warn("The hand-set uuid was not found, refusing") - self._uuid = old_uuid - raise ValueError("Inexistent UUID") - - @property - def is_acquired(self): - """ - tells us if the lock is acquired - """ - if not self.is_taken: - _log.debug("Lock not taken") - return False - try: - self.client.read(self.lock_key) - return True - except etcd.EtcdKeyNotFound: - _log.warn("Lock was supposedly taken, but we cannot find it") - self.is_taken = False - return False - - def acquire(self, blocking=True, lock_ttl=3600, timeout=0): - """ - Acquire the lock. - - :param blocking Block until the lock is obtained, or timeout is reached - :param lock_ttl The duration of the lock we acquired, set to None for eternal locks - :param timeout The time to wait before giving up on getting a lock - """ - # First of all try to write, if our lock is not present. - if not self._find_lock(): - _log.debug("Lock not found, writing it to %s", self.path) - res = self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True) - self._set_sequence(res.key) - _log.debug("Lock key %s written, sequence is %s", res.key, self._sequence) - elif lock_ttl: - # Renew our lock if already here! - self.client.write(self.lock_key, self.uuid, ttl=lock_ttl) - - # now get the owner of the lock, and the next lowest sequence - return self._acquired(blocking=blocking, timeout=timeout) - - def release(self): - """ - Release the lock - """ - if not self._sequence: - self._find_lock() - try: - _log.debug("Releasing existing lock %s", self.lock_key) - self.client.delete(self.lock_key) - except etcd.EtcdKeyNotFound: - _log.info("Lock %s not found, nothing to release", self.lock_key) - pass - finally: - self.is_taken = False - - def __enter__(self): - """ - You can use the lock as a contextmanager - """ - self.acquire(blocking=True, lock_ttl=None) - return self - - def __exit__(self, type, value, traceback): - self.release() - return False - - def _acquired(self, blocking=True, timeout=0): - locker, nearest = self._get_locker() - self.is_taken = False - if self.lock_key == locker: - _log.debug("Lock acquired!") - # We own the lock, yay! - self.is_taken = True - return True - else: - self.is_taken = False - if not blocking: - return False - # Let's look for the lock - watch_key = nearest.key - _log.debug("Lock not acquired, now watching %s", watch_key) - t = max(0, timeout) - while True: - try: - r = self.client.watch(watch_key, timeout=t, index=nearest.modifiedIndex + 1) - _log.debug("Detected variation for %s: %s", r.key, r.action) - return self._acquired(blocking=True, timeout=timeout) - except etcd.EtcdKeyNotFound: - _log.debug("Key %s not present anymore, moving on", watch_key) - return self._acquired(blocking=True, timeout=timeout) - except etcd.EtcdLockExpired as e: - raise e - except etcd.EtcdException: - _log.exception("Unexpected exception") - - @property - def lock_key(self): - if not self._sequence: - raise ValueError("No sequence present.") - return self.path + '/' + str(self._sequence) - - def _set_sequence(self, key): - self._sequence = key.replace(self.path, '').lstrip('/') - - def _find_lock(self): - if self._sequence: - try: - res = self.client.read(self.lock_key) - self._uuid = res.value - return True - except etcd.EtcdKeyNotFound: - return False - elif self._uuid: - try: - for r in self.client.read(self.path, recursive=True).leaves: - if r.value == self._uuid: - self._set_sequence(r.key) - return True - except etcd.EtcdKeyNotFound: - pass - return False - - def _get_locker(self): - results = [res for res in - self.client.read(self.path, recursive=True).leaves] - if not self._sequence: - self._find_lock() - l = sorted([r.key for r in results]) - _log.debug("Lock keys found: %s", l) - try: - i = l.index(self.lock_key) - if i == 0: - _log.debug("No key before our one, we are the locker") - return (l[0], None) - else: - _log.debug("Locker: %s, key to watch: %s", l[0], l[i-1]) - return (l[0], next(x for x in results if x.key == l[i-1])) - except ValueError: - # Something very wrong is going on, most probably - # our lock has expired - raise etcd.EtcdLockExpired(u"Lock not found") diff --git a/cluster_manage/flash_cluster_manager.py b/cluster_manage/flash_cluster_manager.py index 822af4468dc..9c5fe61330a 100644 --- a/cluster_manage/flash_cluster_manager.py +++ b/cluster_manage/flash_cluster_manager.py @@ -6,12 +6,12 @@ from logging.handlers import RotatingFileHandler import conf -import etcd import flash_http_client import placement_rule import tidb_tools import util -from pd_client import PDClient +from pd_client import PDClient, EtcdClient +import define terminal: bool = False @@ -26,11 +26,6 @@ def get_host(): return socket.gethostbyname(socket.gethostname()) -class TiFlashClusterNotMaster(Exception): - def __init__(self): - pass - - def wrap_try_get_lock(func): def wrap_func(manager, *args, **kwargs): manager.try_get_lock() @@ -43,21 +38,6 @@ def wrap_func(manager, *args, **kwargs): return wrap_func -def wrap_add_task(interval_func): - def wrap_func(func): - def _wrap_func(manager, *args, **kwargs): - try: - func(manager, *args, **kwargs) - except TiFlashClusterNotMaster: - pass - except Exception as e: - manager.logger.exception(e) - - return _wrap_func - - return wrap_func - - class Store: TIFLASH_HTTP_PORT_LABEL = 'tiflash_http_port' @@ -95,7 +75,6 @@ class TiFlashClusterManager: ROLE_INIT = 0 ROLE_SLAVE = 1 ROLE_MASTER = 2 - FLASH_LABEL = {'key': 'engine', 'value': 'tiflash'} @staticmethod def compute_cur_store(stores): @@ -109,20 +88,20 @@ def compute_cur_store(stores): conf.flash_conf.service_addr, [store.inner for store in stores.values()])) def _try_refresh(self): - try: - ori_role = self.state[0] - self.pd_client.etcd_client.refresh_ttl(self.cur_store.address) + ori_role = self.state[0] + res = self.pd_client.etcd_client.refresh_ttl(self.cur_store.address) + if res == EtcdClient.EtcdOK: self.state = [TiFlashClusterManager.ROLE_MASTER, time.time()] if ori_role == TiFlashClusterManager.ROLE_INIT: self.logger.debug('Continue become master') - - except etcd.EtcdValueError as e: - self.state = [TiFlashClusterManager.ROLE_SLAVE, time.time()] - self.logger.info('Refresh ttl fail become slave, %s', e.payload['message']) - - except etcd.EtcdKeyNotFound as e: + elif res == EtcdClient.EtcdKeyNotFound: self.state = [TiFlashClusterManager.ROLE_INIT, 0] self.try_get_lock() + elif res == EtcdClient.EtcdValueNotEqual: + self.state = [TiFlashClusterManager.ROLE_SLAVE, time.time()] + self.logger.debug('Refresh ttl fail (key not equal), become slave') + else: + assert False def try_get_lock(self): role, ts = self.state @@ -159,7 +138,7 @@ def __init__(self, pd_client: PDClient, tidb_status_addr_list): def _update_cluster(self): prev_stores = self.stores self.stores = {store_id: Store(store) for store_id, store in - self.pd_client.get_store_by_labels(self.FLASH_LABEL).items()} + self.pd_client.get_store_by_labels(define.TIFLASH_LABEL).items()} if self.stores != prev_stores and prev_stores: self.logger.info('Update all tiflash stores: from {} to {}'.format([k.inner for k in prev_stores.values()], [k.inner for k in self.stores.values()])) @@ -176,16 +155,14 @@ def _check_and_make_rule(self, table, start_key, end_key, all_rules: dict): need_new_rule = True if rule_id in all_rules: rule = all_rules[rule_id] - if rule.override and rule.start_key == start_key and rule.end_key == end_key and rule.label_constraints == [ - {"key": "engine", "op": "in", "values": ["tiflash"]} - ] and rule.location_labels == table["location_labels"] and rule.count == table[ - "replica_count" - ] and rule.role == "learner": + if rule.override and rule.start_key == start_key and rule.end_key == end_key and rule.label_constraints == placement_rule.DEFAULT_LABEL_CONSTRAINTS and rule.location_labels == \ + table[define.LOCATION_LABELS] and rule.count == table[ + define.REPLICA_COUNT] and rule.role == define.LEARNER: need_new_rule = False if need_new_rule: - rules_new = placement_rule.make_rule(rule_id, start_key, end_key, table["replica_count"], - table["location_labels"]) + rules_new = placement_rule.make_rule(rule_id, start_key, end_key, table[define.REPLICA_COUNT], + table[define.LOCATION_LABELS]) self.set_rule(util.obj_2_dict(rules_new)) all_rules.pop(rule_id, None) @@ -206,14 +183,17 @@ def compute_sync_data_process(self, table_id, start_key, end_key): @wrap_try_get_lock def report_to_tidb(self, table, region_count, flash_region_count): + table_id = table['id'] self.logger.info( - 'report_to_tidb {} region_count: {} flash_region_count: {}'.format(table, region_count, flash_region_count)) + 'report_to_tidb: id {}, region_count {}, flash_region_count {}'.format(table_id, region_count, + flash_region_count)) for idx, address in enumerate(self.tidb_status_addr_list): try: r = util.post_http( '{}/tiflash/replica'.format(address, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION), - {"id": table['id'], "region_count": region_count, "flash_region_count": flash_region_count}) + {'id': table_id, define.REGION_COUNT: region_count, + define.TIFLASH_REGION_COUNT: flash_region_count}) if r.status_code == 200: if idx != 0: tmp = self.tidb_status_addr_list[0] @@ -228,13 +208,13 @@ def report_to_tidb(self, table, region_count, flash_region_count): @wrap_try_get_lock def remove_rule(self, rule_id): - self.pd_client.remove_rule(placement_rule.PR_FLASH_GROUP, rule_id) + self.pd_client.remove_rule(placement_rule.TIFLASH_GROUP_ID, rule_id) self.logger.info('Remove placement rule {}'.format(rule_id)) @wrap_try_get_lock def table_update(self): table_list = tidb_tools.db_flash_replica(self.tidb_status_addr_list) - all_rules = self.pd_client.get_group_rules(placement_rule.PR_FLASH_GROUP) + all_rules = self.pd_client.get_group_rules(placement_rule.TIFLASH_GROUP_ID) for table in table_list: from tikv_util import common @@ -243,7 +223,7 @@ def table_update(self): start_key, end_key = st.to_bytes(), ed.to_bytes() self._check_and_make_rule(table, st.to_pd_key(), ed.to_pd_key(), all_rules) - if not table['available']: + if not table[define.AVAILABLE]: region_count, flash_region_count = self.compute_sync_data_process(table_id, start_key, end_key) self.report_to_tidb(table, region_count, flash_region_count) diff --git a/cluster_manage/pd_client.py b/cluster_manage/pd_client.py index 3f74e1000bb..ae5f4df2080 100644 --- a/cluster_manage/pd_client.py +++ b/cluster_manage/pd_client.py @@ -2,37 +2,44 @@ import logging from typing import Optional -import etcd +import define +import etcd3 import uri import conf import util class EtcdClient: - FLASH_PREFIX = 'tiflash' - FLASH_CLUSTER_MUTEX_KEY = '{}/cluster/master'.format(FLASH_PREFIX) + EtcdOK = 0 + EtcdKeyNotFound = 1 + EtcdValueNotEqual = 2 def try_init_mutex(self, cluster_mutex_value): - try: - res = self.client.write(EtcdClient.FLASH_CLUSTER_MUTEX_KEY, - cluster_mutex_value, - ttl=conf.flash_conf.cluster_master_ttl, prevExist=False) - self.logger.info('Try to init master success, ttl: %d, create new key: %s', res.ttl, res.key) - return True - except etcd.EtcdAlreadyExist as e: - self.logger.info('Try to init master fail, %s', e.payload['message']) + val, meta = self.client.get(define.TIFLASH_CLUSTER_MUTEX_KEY) + if val is None: + lease = self.client.lease(conf.flash_conf.cluster_master_ttl) + if self.client.put_if_not_exists(define.TIFLASH_CLUSTER_MUTEX_KEY, cluster_mutex_value, lease=lease): + self.logger.info('Try to init master success, ttl: %d, create new key: %s', lease.ttl, + define.TIFLASH_CLUSTER_MUTEX_KEY) + return True + self.logger.info('Try to init master fail, key exists') return False def refresh_ttl(self, cluster_mutex_value): - self.client.refresh(EtcdClient.FLASH_CLUSTER_MUTEX_KEY, conf.flash_conf.cluster_master_ttl, - prevValue=cluster_mutex_value) + val, meta = self.client.get(define.TIFLASH_CLUSTER_MUTEX_KEY) + if val is None: + return self.EtcdKeyNotFound + if cluster_mutex_value != str(val, encoding="utf8"): + return self.EtcdValueNotEqual + list(self.client.refresh_lease(meta.lease_id)) + return self.EtcdOK - def test_refresh_ttl_wrong_value(self, cluster_mutex_value): - self.refresh_ttl(cluster_mutex_value) + def get_by_prefix(self, prefix): + return self.client.get_prefix(prefix) def __init__(self, host, port): self.logger = logging.getLogger('etcd.client') - self.client = etcd.Client(host=host, port=port) + self.client = etcd3.client(host=host, port=port, timeout=conf.flash_conf.update_rule_interval) class PDClient: @@ -90,6 +97,13 @@ def get_all_rules(self): res = r.json() return res if res is not None else {} + def get_rule(self, group, rule_id): + r = util.curl_http( + '{}/{}/{}/config/rule/{}/{}'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION, group, + rule_id)) + res = r.json() + return res + def set_rule(self, rule): r = util.post_http( '{}/{}/{}/config/rule'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION), rule) diff --git a/cluster_manage/placement_rule.py b/cluster_manage/placement_rule.py index 3845abf4e16..083abd35408 100644 --- a/cluster_manage/placement_rule.py +++ b/cluster_manage/placement_rule.py @@ -1,29 +1,28 @@ #!/usr/bin/python3 -import util +import define -PR_FLASH_GROUP = "tiflash" +TIFLASH_GROUP_ID = define.TIFLASH +DEFAULT_LABEL_CONSTRAINTS = [{"key": "engine", "op": "in", "values": [define.TIFLASH]}] base_rule = { - "group_id": PR_FLASH_GROUP, - "id": "", + "group_id": TIFLASH_GROUP_ID, + 'id': '', "index": 0, "override": True, "start_key": None, "end_key": None, - "role": "learner", + "role": define.LEARNER, "count": 2, - "label_constraints": [ - {"key": "engine", "op": "in", "values": ["tiflash"]} - ], - "location_labels": None + define.LABEL_CONSTRAINTS: DEFAULT_LABEL_CONSTRAINTS, + define.LOCATION_LABELS: None } class PlacementRule: def __init__(self, **entries): self.__dict__.update(entries) - if not hasattr(self, 'location_labels'): + if not hasattr(self, define.LOCATION_LABELS): self.location_labels = [] @@ -35,16 +34,3 @@ def make_rule(rid: str, start_key, end_key, count, location_labels): rule.count = count rule.location_labels = location_labels return rule - - -def get_group_rules(group="tiflash"): - return [] - - -def main(): - rule = make_rule("1", b'1', b'2', 2, ['host']) - print(util.obj_2_dict(rule)) - - -if __name__ == '__main__': - main() diff --git a/release-centos7/Dockerfile-builder b/release-centos7/Dockerfile-builder index 5cf70a4a564..00a43ddf9e2 100644 --- a/release-centos7/Dockerfile-builder +++ b/release-centos7/Dockerfile-builder @@ -33,6 +33,7 @@ RUN yum makecache \ urllib3 \ toml \ setuptools \ + etcd3 \ && cd /prepare-environments \ && ./install-openssl.sh \ && ./install-cmake.sh \ diff --git a/release-centos7/Dockerfile-builder-ci b/release-centos7/Dockerfile-builder-ci index 90a8b708e46..e7bbfb214a5 100644 --- a/release-centos7/Dockerfile-builder-ci +++ b/release-centos7/Dockerfile-builder-ci @@ -5,4 +5,4 @@ WORKDIR /root/ ENV HOME /root/ -COPY tiflash/flash_cluster_manager /flash_cluster_manager +COPY tiflash/flash_cluster_manager /flash_cluster_manager2 diff --git a/release-centos7/build/build-tiflash-ci.sh b/release-centos7/build/build-tiflash-ci.sh index a7ef44120cb..b96aff3c24c 100755 --- a/release-centos7/build/build-tiflash-ci.sh +++ b/release-centos7/build/build-tiflash-ci.sh @@ -19,7 +19,7 @@ set -xe install_dir="$SRCPATH/release-centos7/tiflash" if [ -d "$install_dir" ]; then rm -rf "$install_dir"/*; else mkdir -p "$install_dir"; fi -cp -r /flash_cluster_manager "$install_dir" +cp -r /flash_cluster_manager2 "$install_dir"/flash_cluster_manager if [ -d "$SRCPATH/contrib/kvproto" ]; then cd "$SRCPATH/contrib/kvproto"