From b0cbe20d0a6ffae5aa6ac75a5424b64b7651bed1 Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Thu, 7 Mar 2019 19:39:06 +0100 Subject: [PATCH 1/8] CLI: implement 'openio-admin check' --- oio/blob/client.py | 14 +- oio/cli/admin/item.py | 181 ++++++++++++++++++ oio/cli/common/clientmanager.py | 4 +- oio/common/constants.py | 2 + oio/common/decorators.py | 15 +- oio/common/http_urllib3.py | 2 +- oio/crawler/integrity.py | 330 ++++++++++++++++++++++---------- oio/rdir/client.py | 4 +- setup.cfg | 4 + tests/unit/api/test_rdir.py | 1 - 10 files changed, 442 insertions(+), 115 deletions(-) create mode 100644 oio/cli/admin/item.py diff --git a/oio/blob/client.py b/oio/blob/client.py index 5837b7c462..ae88577cf7 100644 --- a/oio/blob/client.py +++ b/oio/blob/client.py @@ -24,7 +24,7 @@ oio_exception_from_httperror, urllib3 from oio.common import exceptions as exc, utils from oio.common.constants import CHUNK_HEADERS, chunk_xattr_keys_optional, \ - HEADER_PREFIX, OIO_VERSION + FETCHXATTR_HEADER, OIO_VERSION, REQID_HEADER from oio.common.decorators import ensure_headers, ensure_request_id from oio.api.io import ChunkReader from oio.api.replication import ReplicatedMetachunkWriter, FakeChecksum @@ -153,15 +153,23 @@ def chunk_get(self, url, **kwargs): @update_rawx_perfdata @ensure_request_id def chunk_head(self, url, **kwargs): + """ + Perform a HEAD request on a chunk. + + :param url: URL of the chunk to request. + :keyword xattr: when False, ask the rawx not to read + extended attributes of the chunk. + :returns: a `dict` with chunk metadata (empty when xattr is False). + """ _xattr = bool(kwargs.get('xattr', True)) url = self.resolve_url(url) headers = kwargs['headers'].copy() - headers[HEADER_PREFIX + 'xattr'] = _xattr + headers[FETCHXATTR_HEADER] = _xattr try: resp = self.http_pool.request( 'HEAD', url, headers=headers) except urllib3.exceptions.HTTPError as ex: - oio_exception_from_httperror(ex, reqid=headers['X-oio-req-id'], + oio_exception_from_httperror(ex, reqid=headers[REQID_HEADER], url=url) if resp.status == 200: if not _xattr: diff --git a/oio/cli/admin/item.py b/oio/cli/admin/item.py new file mode 100644 index 0000000000..1da6ea8056 --- /dev/null +++ b/oio/cli/admin/item.py @@ -0,0 +1,181 @@ +# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +from cliff import lister + +from oio.crawler.integrity import Checker, Target + + +class CheckCommandMixin(object): + """ + Various parameters that apply to all check commands. + """ + + columns = ('Type', 'Item', 'Status', 'Errors') + + def build_checker(self, parsed_args): + """Build an instance of Checker.""" + # TODO(FVE): when Checker is refactored, review the list of + # parameters we should pass. + checker = Checker(self.app.options.ns) + return checker + + def patch_parser(self, parser): + parser.add_argument( + '--depth', + type=int, + default=0, + help=("How deep to recurse. 0 means do not recurse. " + "N > 0 means recurse N levels below the specified item type," + " from namespace to chunk.") + ) + parser.add_argument( + '--checksum', + action='store_true', + help=("Perform checksum comparisons. This requires downloading " + "data from rawx services.") + ) + + def _format_results(self, checker): + for res in checker.wait(): + yield (res.target.type, repr(res.target), + res.health, res.errors_to_str()) + + def format_results(self, checker): + return self.__class__.columns, self._format_results(checker) + + +class AccountCheck(CheckCommandMixin, lister.Lister): + """ + Check an account for problems. + """ + base_level = 4 + + def get_parser(self, prog_name): + parser = super(AccountCheck, self).get_parser(prog_name) + self.patch_parser(parser) + + parser.add_argument( + 'accounts', + nargs='*', + metavar='', + help='Name of the account to check.' + ) + return parser + + def take_action(self, parsed_args): + super(AccountCheck, self).take_action(parsed_args) + # FIXME(FVE): use parsed_args.depth + checker = self.build_checker(parsed_args) + if not parsed_args.accounts: + parsed_args.accounts = [self.app.options.account] + for acct in parsed_args.accounts: + target = Target(acct) + checker.check(target) + return self.format_results(checker) + + +class ContainerCheck(CheckCommandMixin, lister.Lister): + """ + Check a container for problems. + """ + base_level = 3 + + def get_parser(self, prog_name): + parser = super(ContainerCheck, self).get_parser(prog_name) + self.patch_parser(parser) + + parser.add_argument( + 'containers', + nargs='+', + metavar='', + help='Name of the container to check.' + ) + return parser + + def take_action(self, parsed_args): + super(ContainerCheck, self).take_action(parsed_args) + checker = self.build_checker(parsed_args) + for ct in parsed_args.containers: + target = Target(self.app.options.account, ct) + checker.check(target) + return self.format_results(checker) + + +class ObjectCheck(CheckCommandMixin, lister.Lister): + """ + Check an object for problems. + """ + base_level = 2 + + def get_parser(self, prog_name): + parser = super(ObjectCheck, self).get_parser(prog_name) + self.patch_parser(parser) + + parser.add_argument( + 'container', + metavar='', + help='Name of the container holding the object.' + ) + parser.add_argument( + 'objects', + metavar='', + nargs='+', + help='Name of the object to check.' + ) + parser.add_argument( + '--object-version', + metavar='', + help=("Version of the object to check. Works when only one " + "object is specified on command line.") + ) + return parser + + def take_action(self, parsed_args): + super(ObjectCheck, self).take_action(parsed_args) + checker = self.build_checker(parsed_args) + for obj in parsed_args.objects: + target = Target(self.app.options.account, parsed_args.container, + obj, parsed_args.object_version) + checker.check(target) + return self.format_results(checker) + + +class ChunkCheck(CheckCommandMixin, lister.Lister): + """ + Check a chunk for problems. + """ + base_level = 0 + + def get_parser(self, prog_name): + parser = super(ChunkCheck, self).get_parser(prog_name) + self.patch_parser(parser) + + parser.add_argument( + 'chunks', + metavar='', + nargs='+', + help='URL to the chunk to check.' + ) + return parser + + def take_action(self, parsed_args): + super(ChunkCheck, self).take_action(parsed_args) + checker = self.build_checker(parsed_args) + for chunk in parsed_args.chunks: + target = Target(self.app.options.account, chunk=chunk) + checker.check(target) + return self.format_results(checker) diff --git a/oio/cli/common/clientmanager.py b/oio/cli/common/clientmanager.py index 02a13bf228..a7797ea298 100644 --- a/oio/cli/common/clientmanager.py +++ b/oio/cli/common/clientmanager.py @@ -122,7 +122,7 @@ def flatns_set_bits(self, bits): @property def flatns_manager(self): - if self._flatns_manager: + if self._flatns_manager is not None: return self._flatns_manager from oio.common.autocontainer import HashedContainerBuilder options = self.nsinfo['options'] @@ -163,7 +163,7 @@ def meta1_digits(self): @property def nsinfo(self): - if not self._nsinfo: + if self._nsinfo is None: self._nsinfo = self.conscience.info() return self._nsinfo diff --git a/oio/common/constants.py b/oio/common/constants.py index 672c19e8c8..cdeed62422 100644 --- a/oio/common/constants.py +++ b/oio/common/constants.py @@ -15,9 +15,11 @@ HEADER_PREFIX = 'x-oio-' ADMIN_HEADER = HEADER_PREFIX + 'admin' +FETCHXATTR_HEADER = HEADER_PREFIX + 'xattr' FORCEMASTER_HEADER = HEADER_PREFIX + 'force-master' PERFDATA_HEADER = HEADER_PREFIX + 'perfdata' PERFDATA_HEADER_PREFIX = PERFDATA_HEADER + '-' +REQID_HEADER = HEADER_PREFIX + 'req-id' CONTAINER_METADATA_PREFIX = "x-oio-container-meta-" OBJECT_METADATA_PREFIX = "x-oio-content-meta-" diff --git a/oio/common/decorators.py b/oio/common/decorators.py index d0deb24805..3f7fff076f 100644 --- a/oio/common/decorators.py +++ b/oio/common/decorators.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2017-2019 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -14,6 +14,7 @@ # License along with this library. from functools import wraps +from oio.common.constants import REQID_HEADER from oio.common.utils import request_id from oio.common.exceptions import NotFound, NoSuchAccount, NoSuchObject, \ NoSuchContainer, reraise @@ -22,7 +23,8 @@ def ensure_headers(func): @wraps(func) def ensure_headers_wrapper(*args, **kwargs): - kwargs['headers'] = kwargs.get('headers') or dict() + if kwargs.setdefault('headers', dict()) is None: + kwargs['headers'] = dict() return func(*args, **kwargs) return ensure_headers_wrapper @@ -30,13 +32,12 @@ def ensure_headers_wrapper(*args, **kwargs): def ensure_request_id(func): @wraps(func) def ensure_request_id_wrapper(*args, **kwargs): - headers = kwargs.get('headers', dict()) - if 'X-oio-req-id' not in headers: + headers = kwargs.setdefault('headers', dict()) + if REQID_HEADER not in headers: if 'req_id' in kwargs: - headers['X-oio-req-id'] = kwargs.pop('req_id') + headers[REQID_HEADER] = kwargs.pop('req_id') else: - headers['X-oio-req-id'] = request_id() - kwargs['headers'] = headers + headers[REQID_HEADER] = request_id() return func(*args, **kwargs) return ensure_request_id_wrapper diff --git a/oio/common/http_urllib3.py b/oio/common/http_urllib3.py index 1941b0bae9..842922e56e 100644 --- a/oio/common/http_urllib3.py +++ b/oio/common/http_urllib3.py @@ -57,7 +57,7 @@ def get_pool_manager(pool_connections=DEFAULT_POOLSIZE, """ Get `urllib3.PoolManager` to manage pools of connections - :param pool_connections: number of connection pools + :param pool_connections: number of connection pools (see "num_pools"). :type pool_connections: `int` :param pool_maxsize: number of connections per connection pool :type pool_maxsize: `int` diff --git a/oio/crawler/integrity.py b/oio/crawler/integrity.py index 8eccd8d9b2..9123d50240 100644 --- a/oio/crawler/integrity.py +++ b/oio/crawler/integrity.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -19,7 +19,7 @@ from __future__ import print_function -from oio.common.green import Event, GreenPool +from oio.common.green import Event, GreenPool, Queue import os import csv @@ -28,6 +28,7 @@ import argparse from oio.common import exceptions as exc +from oio.common.fullpath import decode_fullpath, decode_old_fullpath from oio.common.storage_method import STORAGE_METHODS from oio.account.client import AccountClient from oio.container.client import ContainerClient @@ -36,10 +37,16 @@ class Target(object): - def __init__(self, account, container=None, obj=None, chunk=None): + """ + Identify the target of a check. + """ + + def __init__(self, account, container=None, obj=None, + version=None, chunk=None): self.account = account self.container = container self.obj = obj + self.version = version self.chunk = chunk def copy(self): @@ -47,17 +54,70 @@ def copy(self): self.account, self.container, self.obj, + self.version, self.chunk) + def copy_object(self): + return Target(self.account, self.container, self.obj, self.version) + + def copy_container(self): + return Target(self.account, self.container) + + def copy_account(self): + return Target(self.account) + def __repr__(self): - s = "account=" + self.account + if self.type == 'chunk': + return 'chunk=' + self.chunk + out = 'account=%s' % self.account if self.container: - s += ', container=' + self.container + out += ', container=' + self.container if self.obj: - s += ', obj=' + self.obj + out += ', obj=' + self.obj + if self.version: + out += ', version=' + self.version + if self.chunk: + out += ', chunk=' + self.chunk + return out + + @property + def type(self): + """Tell which type of item this object targets.""" if self.chunk: - s += ', chunk=' + self.chunk - return s + return 'chunk' + elif self.obj: + return 'object' + elif self.container: + return 'container' + else: + return 'account' + + +class ItemResult(object): + """ + Hold the result of a check. + Must be serializable to be used in the Checker's return queue. + """ + + def __init__(self, target, errors=None): + self.errors = errors if errors is not None else list() + self.target = target + + @property + def health(self): + """ + Tell the health of the item that has been checked. + """ + # TODO(FVE): add an intermediate 'warning' level + return 'error' if self.errors else 'OK' + + def errors_to_str(self, separator='\n'): + """ + Pretty print errors stored in this result. + """ + if not self.errors: + return str(None) + return separator.join(str(x) for x in self.errors) class Checker(object): @@ -107,6 +167,33 @@ def __init__(self, namespace, concurrency=50, self.list_cache = {} self.running = {} + self.result_queue = Queue() + + def complete_target_from_chunk_metadata(self, target, xattr_meta): + """ + Complete a Target object from metadata found in chunk's extended + attributes. + """ + # pylint: disable=unbalanced-tuple-unpacking + try: + acct, ct, path, vers, _oid = \ + decode_fullpath(xattr_meta['full_path']) + except ValueError: + acct, ct, path, vers = \ + decode_old_fullpath(xattr_meta['full_path']) + # TODO(FVE): load old-style metadata + # TODO(FVE): support object ID + target.account = acct + target.container = ct + target.obj = path + target.version = vers + + def send_result(self, target, errors=None): + """ + Put an item in the result queue. + """ + # TODO(FVE): send to an external queue. + self.result_queue.put(ItemResult(target, errors)) def write_error(self, target, irreparable=False): error = list() @@ -147,69 +234,90 @@ def write_chunk_error(self, target, obj_meta, irreparable=irreparable) def _check_chunk_xattr(self, target, obj_meta, xattr_meta): - error = False + """ + Check coherency of chunk extended attributes with object metadata. + + :returns: a list of errors + """ + errors = list() # Composed position -> erasure coding attr_prefix = 'meta' if '.' in obj_meta['pos'] else '' attr_key = attr_prefix + 'chunk_size' if str(obj_meta['size']) != xattr_meta.get(attr_key): - print(" Chunk %s '%s' xattr (%s) " - "differs from size in meta2 (%s)" % - (target, attr_key, xattr_meta.get(attr_key), - obj_meta['size'])) - error = True + errors.append( + "Chunk %s '%s' xattr (%s) differs from size in meta2 (%s)" % + (target.chunk, attr_key, xattr_meta.get(attr_key), + obj_meta['size'])) attr_key = attr_prefix + 'chunk_hash' if obj_meta['hash'] != xattr_meta.get(attr_key): - print(" Chunk %s '%s' xattr (%s) " - "differs from hash in meta2 (%s)" % - (target, attr_key, xattr_meta.get(attr_key), - obj_meta['hash'])) - error = True - return error + errors.append( + "Chunk %s '%s' xattr (%s) differs from hash in meta2 (%s)" % + (target.chunk, attr_key, xattr_meta.get(attr_key), + obj_meta['hash'])) + return errors def _check_chunk(self, target): - chunk = target.chunk + """ + Execute various checks on a chunk: + - does it appear in object's chunk list? + - is it reachable? + - are its extended attributes coherent? - obj_listing, obj_meta = self.check_obj(target) - error = False - if chunk not in obj_listing: - print(' Chunk %s missing from object listing' % target) - error = True - db_meta = dict() - else: - db_meta = obj_listing[chunk] + :returns: the list of errors encountered, + and the chunk's owner object metadata. + """ + chunk = target.chunk + errors = list() + obj_meta = None + xattr_meta = None try: xattr_meta = self.blob_client.chunk_head(chunk, xattr=self.full) - except exc.NotFound as e: + except exc.NotFound as err: self.chunk_not_found += 1 - error = True - print(' Not found chunk "%s": %s' % (target, str(e))) + errors.append('Chunk %s not found: %s' % (chunk, str(err))) except exc.FaultyChunk as err: self.chunk_exceptions += 1 - error = True - print(' Exception chunk "%s": %r' % (target, err)) - except Exception as e: + errors.append('Chunk %s is faulty: %r' % (chunk, err)) + except Exception as err: self.chunk_exceptions += 1 - error = True - print(' Exception chunk "%s": %s' % (target, str(e))) - else: - if db_meta and self.full: - error = self._check_chunk_xattr(target, db_meta, xattr_meta) + errors.append('Chunk %s failed to be checked: %s' % ( + chunk, str(err))) + if not target.obj and xattr_meta: + self.complete_target_from_chunk_metadata(target, xattr_meta) + + if target.obj: + obj_listing, obj_meta = self.check_obj(target.copy_object()) + if chunk not in obj_listing: + errors.append('Chunk %s missing from object listing' % chunk) + db_meta = dict() + else: + db_meta = obj_listing[chunk] + + if db_meta and xattr_meta and self.full: + errors.extend( + self._check_chunk_xattr(target, db_meta, xattr_meta)) + + self.send_result(target, errors) self.chunks_checked += 1 - return error, obj_meta + return errors, obj_meta def check_chunk(self, target): - error, obj_meta = self._check_chunk(target) - if error: - self.write_chunk_error(target, obj_meta) + errors, _obj_meta = self._check_chunk(target) + return errors def _check_metachunk(self, target, obj_meta, stg_met, pos, chunks, recurse=False): + """ + Check that a metachunk has the right number of chunks. + + :returns: the list of errors + """ required = stg_met.expected_chunks - chunk_errors = list() + errors = list() if len(chunks) < required: missing_chunks = required - len(chunks) @@ -219,36 +327,53 @@ def _check_metachunk(self, target, obj_meta, stg_met, pos, chunks, subs = {x['num'] for x in chunks} for sub in range(required): if sub not in subs: - chunk_errors.append( - (target, obj_meta, '%d.%d' % (pos, sub))) + errors.append( + "Missing chunk at position %d.%d" % (pos, sub)) else: for _ in range(missing_chunks): - chunk_errors.append((target, obj_meta, str(pos))) + errors.append("Missing chunk at position %d" % pos) if recurse: for chunk in chunks: - t = target.copy() - t.chunk = chunk['url'] - error, obj_meta = self._check_chunk(t) - if error: - chunk_errors.append((t, obj_meta)) - - irreparable = required - len(chunk_errors) < stg_met.min_chunks_to_read - for chunk_error in chunk_errors: - self.write_chunk_error(*chunk_error, irreparable=irreparable) + tcopy = target.copy() + tcopy.chunk = chunk['url'] + chunk_errors, _ = self._check_chunk(tcopy) + if chunk_errors: + # The errors have already been reported by _check_chunk, + # but we must count this chunk among the unusable chunks + # of the current metachunk. + errors.append("Unusable chunk %s at position %s" % ( + chunk['url'], chunk['pos'])) + + irreparable = required - len(errors) < stg_met.min_chunks_to_read + if irreparable: + errors.append( + "Unavailable metachunk at position %s (%d/%d chunks)" % ( + pos, required - len(errors), stg_met.expected_chunks)) + # Since the "metachunk" is not an official item type, + # this method does not report errors itself. Errors will + # be reported as object errors. + return errors def _check_obj_policy(self, target, obj_meta, chunks, recurse=False): """ Check that the list of chunks of an object matches the object's storage policy. + + :returns: the list of errors encountered """ stg_met = STORAGE_METHODS.load(obj_meta['chunk_method']) chunks_by_pos = _sort_chunks(chunks, stg_met.ec) + tasks = list() for pos, chunks in chunks_by_pos.iteritems(): - self.pool.spawn_n( + tasks.append(self.pool.spawn( self._check_metachunk, target.copy(), obj_meta, stg_met, pos, chunks, - recurse=recurse) + recurse=recurse)) + errors = list() + for task in tasks: + errors.extend(task.wait()) + return errors def check_obj(self, target, recurse=False): account = target.account @@ -261,11 +386,11 @@ def check_obj(self, target, recurse=False): return self.list_cache[(account, container, obj)] self.running[(account, container, obj)] = Event() print('Checking object "%s"' % target) - container_listing, ct_meta = self.check_container(target) - error = False + container_listing, _ = self.check_container(target.copy_container()) + errors = list() if obj not in container_listing: - print(' Object %s missing from container listing' % target) - error = True + errors.append('Object %s/%s missing from container listing' % ( + target.obj, target.version)) # checksum = None else: # TODO check checksum match @@ -278,14 +403,14 @@ def check_obj(self, target, recurse=False): meta, results = self.container_client.content_locate( account=account, reference=container, path=obj, properties=False) - except exc.NotFound as e: + except exc.NotFound as err: self.object_not_found += 1 - error = True - print(' Not found object "%s": %s' % (target, str(e))) - except Exception as e: + errors.append('Object %s/%s not found: %s' % ( + target.obj, target.version, str(err))) + except Exception as err: self.object_exceptions += 1 - error = True - print(' Exception object "%s": %s' % (target, str(e))) + errors.append('Object %s/%s failed to be checked: %s' % ( + target.obj, target.version, str(err))) chunk_listing = dict() for chunk in results: @@ -299,10 +424,10 @@ def check_obj(self, target, recurse=False): # Skip the check if we could not locate the object if meta: - self._check_obj_policy(target, meta, results, recurse=recurse) + errors.extend( + self._check_obj_policy(target, meta, results, recurse=recurse)) - if error and self.error_file: - self.write_error(target) + self.send_result(target, errors) return chunk_listing, meta def check_container(self, target, recurse=False): @@ -315,11 +440,12 @@ def check_container(self, target, recurse=False): return self.list_cache[(account, container)] self.running[(account, container)] = Event() print('Checking container "%s"' % target) - account_listing = self.check_account(target) - error = False + account_listing = self.check_account(target.copy_account()) + errors = list() if container not in account_listing: - error = True - print(' Container %s missing from account listing' % target) + errors.append( + 'Container %s missing from account listing' % ( + container)) marker = None results = [] @@ -335,15 +461,15 @@ def check_container(self, target, recurse=False): _, resp = self.container_client.content_list( account=account, reference=container, marker=marker, **extra_args) - except exc.NotFound as e: + except exc.NotFound as err: self.container_not_found += 1 - error = True - print(' Not found container "%s": %s' % (target, str(e))) + errors.append('Container %s not found: %s' % ( + container, str(err))) break - except Exception as e: + except Exception as err: self.container_exceptions += 1 - error = True - print(' Exception container "%s": %s' % (target, str(e))) + errors.append('Container %s failed to be checked: %s' % ( + target, str(err))) break if resp['objects']: @@ -369,11 +495,10 @@ def check_container(self, target, recurse=False): if recurse: for obj in container_listing: - t = target.copy() - t.obj = obj - self.pool.spawn_n(self.check_obj, t, True) - if error and self.error_file: - self.write_error(target) + tcopy = target.copy() + tcopy.obj = obj + self.pool.spawn_n(self.check_obj, tcopy, True) + self.send_result(target, errors) return container_listing, ct_meta def check_account(self, target, recurse=False): @@ -385,7 +510,7 @@ def check_account(self, target, recurse=False): return self.list_cache[account] self.running[account] = Event() print('Checking account "%s"' % target) - error = False + errors = list() marker = None results = [] extra_args = dict() @@ -399,10 +524,11 @@ def check_account(self, target, recurse=False): try: resp = self.account_client.container_list( account, marker=marker, **extra_args) - except Exception as e: + except Exception as err: self.account_exceptions += 1 - error = True - print(' Exception account "%s": %s' % (target, str(e))) + errors.append( + 'Account %s failed to be checked: %s' % ( + account, str(err))) break if resp['listing']: marker = resp['listing'][-1][0] @@ -425,26 +551,32 @@ def check_account(self, target, recurse=False): if recurse: for container in containers: - t = target.copy() - t.container = container - self.pool.spawn_n(self.check_container, t, True) + tcopy = target.copy_account() + tcopy.container = container + self.pool.spawn_n(self.check_container, tcopy, True) - if error and self.error_file: - self.write_error(target) + self.send_result(target, errors) return containers def check(self, target): - if target.chunk and target.obj and target.container: + if target.type == 'chunk': self.pool.spawn_n(self.check_chunk, target) - elif target.obj and target.container: + elif target.type == 'object': self.pool.spawn_n(self.check_obj, target, True) - elif target.container: + elif target.type == 'container': self.pool.spawn_n(self.check_container, target, True) else: self.pool.spawn_n(self.check_account, target, True) + def handle_results(self): + while not self.result_queue.empty(): + res = self.result_queue.get(True) + yield res + def wait(self): self.pool.waitall() + # FIXME(FVE): + return self.handle_results() def report(self): success = True diff --git a/oio/rdir/client.py b/oio/rdir/client.py index b135fbf39a..3ac09b1451 100644 --- a/oio/rdir/client.py +++ b/oio/rdir/client.py @@ -500,7 +500,7 @@ def meta2_index_create(self, volume_id, **kwargs): return self.create(volume_id, service_type='meta2', **kwargs) def meta2_index_push(self, volume_id, container_url, container_id, mtime, - headers=None, **kwargs): + **kwargs): """ Add a newly created container to the list of containers handled by the meta2 server in question. @@ -520,7 +520,7 @@ def meta2_index_push(self, volume_id, container_url, container_id, mtime, return self._rdir_request(volume=volume_id, method='POST', action='push', create=True, json=body, - headers=headers, service_type='meta2', + service_type='meta2', **kwargs) def _resolve_cid_to_path(self, cid): diff --git a/setup.cfg b/setup.cfg index 4ae30434c8..9e4fb85871 100644 --- a/setup.cfg +++ b/setup.cfg @@ -138,11 +138,15 @@ openio.zk = zk_bootstrap = oio.cli.zk.set:HierarchyBootstrap openio.admin = + account_check = oio.cli.admin.item:AccountCheck + chunk_check = oio.cli.admin.item:ChunkCheck + container_check = oio.cli.admin.item:ContainerCheck election_debug = oio.cli.election.election:ElectionDebug election_leave = oio.cli.election.election:ElectionLeave election_ping = oio.cli.election.election:ElectionPing election_status = oio.cli.election.election:ElectionStatus election_sync = oio.cli.election.election:ElectionSync + object_check = oio.cli.admin.item:ObjectCheck oio.conscience.checker = asn1 = oio.conscience.checker.asn1:Asn1PingChecker diff --git a/tests/unit/api/test_rdir.py b/tests/unit/api/test_rdir.py index 5d1b66b28b..6d7c9f1544 100644 --- a/tests/unit/api/test_rdir.py +++ b/tests/unit/api/test_rdir.py @@ -156,7 +156,6 @@ def test_volume_push(self): 'container_id': self.container_id, 'mtime': int(self.mtime), }, - 'headers': None, 'service_type': 'meta2' } From 0d0be2207c0fbda64c85f141b3d9d3eae6035a8e Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Tue, 12 Mar 2019 18:55:37 +0100 Subject: [PATCH 2/8] tests: improve autocontainer listing checks --- tests/functional/cli/object/test_object.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/functional/cli/object/test_object.py b/tests/functional/cli/object/test_object.py index aa80a4d21d..8e331aa525 100644 --- a/tests/functional/cli/object/test_object.py +++ b/tests/functional/cli/object/test_object.py @@ -241,10 +241,11 @@ def test_drain_with_cid(self): def _test_autocontainer_object_listing(self, args='', env=None): obj_count = 7 + prefix = random_str(8) + expected = list() with tempfile.NamedTemporaryFile() as myfile: myfile.write('something') myfile.flush() - prefix = random_str(8) # TODO(FVE): find a quicker way to upload several objects commands = list() for i in range(obj_count): @@ -252,6 +253,7 @@ def _test_autocontainer_object_listing(self, args='', env=None): commands.append(' '.join(['object create --auto ', myfile.name, '--name ', obj_name, args])) + expected.append(obj_name) self.openio_batch(commands, env=env) # Default listing @@ -280,7 +282,8 @@ def _test_autocontainer_object_listing(self, args='', env=None): output = self.openio('object list --auto --no-paging --prefix ' + prefix + ' ' + opts + ' ' + args, env=env) listing = self.json_loads(output) - self.assertEqual(obj_count, len(listing)) + actual = sorted(x['Name'] for x in listing) + self.assertEqual(expected, actual) for obj in listing: # 4 columns self.assertEqual(4, len(obj)) From 3abefd9f100dd659f4fadc27189f3081fed8e99b Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Wed, 13 Mar 2019 18:55:11 +0100 Subject: [PATCH 3/8] Python API: fix exception reraise function --- oio/common/exceptions.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/oio/common/exceptions.py b/oio/common/exceptions.py index ab9fd05053..849cb7972d 100644 --- a/oio/common/exceptions.py +++ b/oio/common/exceptions.py @@ -222,12 +222,13 @@ def __init__(self, http_status, status=None, message=None): self.http_status = http_status self.message = message or 'n/a' self.status = status + super(StatusMessageException, self).__init__(self.message) def __str__(self): - s = "%s (HTTP %s)" % (self.message, self.http_status) + out = "%s (HTTP %s)" % (self.message, self.http_status) if self.status: - s += ' (STATUS %s)' % self.status - return s + out += ' (STATUS %s)' % self.status + return out class ClientException(StatusMessageException): @@ -321,6 +322,8 @@ def reraise(exc_type, exc_value, extra_message=None): plus maybe `extra_message` at the beginning. """ args = exc_value.args + if isinstance(exc_value, StatusMessageException): + args = (exc_value.message, ) + args if extra_message: args = (extra_message, ) + args raise exc_type(*args), None, exc_info()[2] From 5fdbae1c9013a35ee440ff25316c34dfc50c31b0 Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Wed, 13 Mar 2019 18:52:51 +0100 Subject: [PATCH 4/8] CLI: support checking object versions, log to files Support checking multiple object versions (only the latest was checked) with oio-crawler-integrity and 'openio-admin object check' (and related commands). Also allow 'openio-admin check' to log to files, as does oio-crawler-integrity. Jira: OB-339 --- oio/cli/admin/item.py | 44 +++- oio/crawler/integrity.py | 281 ++++++++++++++++-------- tests/functional/blob/test_converter.py | 3 +- 3 files changed, 231 insertions(+), 97 deletions(-) diff --git a/oio/cli/admin/item.py b/oio/cli/admin/item.py index 1da6ea8056..11b027dceb 100644 --- a/oio/cli/admin/item.py +++ b/oio/cli/admin/item.py @@ -28,12 +28,28 @@ class CheckCommandMixin(object): def build_checker(self, parsed_args): """Build an instance of Checker.""" - # TODO(FVE): when Checker is refactored, review the list of - # parameters we should pass. - checker = Checker(self.app.options.ns) + checker = Checker( + self.app.options.ns, + concurrency=parsed_args.concurrency, + error_file=parsed_args.output, + rebuild_file=parsed_args.output_for_blob_rebuilder, + request_attempts=parsed_args.attempts, + ) return checker def patch_parser(self, parser): + parser.add_argument( + '--attempts', + type=int, + default=1, + help="Number of attempts for listing requests (default: 1)." + ) + parser.add_argument( + '--checksum', + action='store_true', + help=("Perform checksum comparisons. This requires downloading " + "data from rawx services.") + ) parser.add_argument( '--depth', type=int, @@ -43,14 +59,24 @@ def patch_parser(self, parser): " from namespace to chunk.") ) parser.add_argument( - '--checksum', - action='store_true', - help=("Perform checksum comparisons. This requires downloading " - "data from rawx services.") + '--concurrency', '--workers', type=int, + default=30, + help="Number of concurrent checks (default: 30)." + ) + parser.add_argument( + '-o', '--output', + help=("Output file. Will contain elements in error. " + "Can later be passed to stdin of the legacy " + "oio-crawler-integrity to re-check only these elements.") + ) + parser.add_argument( + '--output-for-blob-rebuilder', + help=("Write chunk errors in a file with a format " + "suitable as oio-blob-rebuilder input.") ) def _format_results(self, checker): - for res in checker.wait(): + for res in checker.run(): yield (res.target.type, repr(res.target), res.health, res.errors_to_str()) @@ -149,7 +175,7 @@ def take_action(self, parsed_args): checker = self.build_checker(parsed_args) for obj in parsed_args.objects: target = Target(self.app.options.account, parsed_args.container, - obj, parsed_args.object_version) + obj, version=parsed_args.object_version) checker.check(target) return self.format_results(checker) diff --git a/oio/crawler/integrity.py b/oio/crawler/integrity.py index 9123d50240..a185a6d8a5 100644 --- a/oio/crawler/integrity.py +++ b/oio/crawler/integrity.py @@ -19,7 +19,7 @@ from __future__ import print_function -from oio.common.green import Event, GreenPool, Queue +from oio.common.green import Event, GreenPool, Queue, sleep import os import csv @@ -30,9 +30,7 @@ from oio.common import exceptions as exc from oio.common.fullpath import decode_fullpath, decode_old_fullpath from oio.common.storage_method import STORAGE_METHODS -from oio.account.client import AccountClient -from oio.container.client import ContainerClient -from oio.blob.client import BlobClient +from oio.api.object_storage import ObjectStorageApi from oio.api.object_storage import _sort_chunks @@ -42,10 +40,11 @@ class Target(object): """ def __init__(self, account, container=None, obj=None, - version=None, chunk=None): + content_id=None, version=None, chunk=None): self.account = account self.container = container self.obj = obj + self.content_id = content_id self.version = version self.chunk = chunk @@ -54,11 +53,13 @@ def copy(self): self.account, self.container, self.obj, + self.content_id, self.version, self.chunk) def copy_object(self): - return Target(self.account, self.container, self.obj, self.version) + return Target(self.account, self.container, + self.obj, self.content_id, self.version) def copy_container(self): return Target(self.account, self.container) @@ -74,6 +75,8 @@ def __repr__(self): out += ', container=' + self.container if self.obj: out += ', obj=' + self.obj + if self.content_id: + out += ', content_id=' + self.content_id if self.version: out += ', version=' + self.version if self.chunk: @@ -134,23 +137,18 @@ def __init__(self, namespace, concurrency=50, # 2 -> limit container listings (list of objects) self.limit_listings = limit_listings if self.error_file: - f = open(self.error_file, 'a') - self.error_writer = csv.writer(f, delimiter=' ') + outfile = open(self.error_file, 'a') + self.error_writer = csv.writer(outfile, delimiter=' ') self.rebuild_file = rebuild_file if self.rebuild_file: fd = open(self.rebuild_file, 'a') self.rebuild_writer = csv.writer(fd, delimiter='|') - conf = {'namespace': namespace} - self.account_client = AccountClient( - conf, - max_retries=request_attempts - 1) - self.container_client = ContainerClient( - conf, + self.api = ObjectStorageApi( + namespace, max_retries=request_attempts - 1, request_attempts=request_attempts) - self.blob_client = BlobClient(conf=conf) self.accounts_checked = 0 self.containers_checked = 0 @@ -176,16 +174,17 @@ def complete_target_from_chunk_metadata(self, target, xattr_meta): """ # pylint: disable=unbalanced-tuple-unpacking try: - acct, ct, path, vers, _oid = \ + acct, ct, path, vers, content_id = \ decode_fullpath(xattr_meta['full_path']) except ValueError: acct, ct, path, vers = \ decode_old_fullpath(xattr_meta['full_path']) + content_id = None # TODO(FVE): load old-style metadata - # TODO(FVE): support object ID target.account = acct target.container = ct target.obj = path + target.content_id = content_id target.version = vers def send_result(self, target, errors=None): @@ -196,6 +195,8 @@ def send_result(self, target, errors=None): self.result_queue.put(ItemResult(target, errors)) def write_error(self, target, irreparable=False): + if not self.error_file: + return error = list() if irreparable: error.append('#IRREPARABLE') @@ -208,7 +209,8 @@ def write_error(self, target, irreparable=False): error.append(target.chunk) self.error_writer.writerow(error) - def write_rebuilder_input(self, target, obj_meta, irreparable=False): + def write_rebuilder_input(self, target, irreparable=False): + # FIXME(FVE): cid can be computed from account and container names ct_meta = self.list_cache[(target.account, target.container)][1] try: cid = ct_meta['system']['sys.name'].split('.', 1)[0] @@ -218,19 +220,20 @@ def write_rebuilder_input(self, target, obj_meta, irreparable=False): if irreparable: error.append('#IRREPARABLE') error.append(cid) - error.append(obj_meta['id']) + # FIXME(FVE): ensure we always resolve content_id, + # or pass object version along with object name. + error.append(target.content_id or target.obj) error.append(target.chunk) self.rebuild_writer.writerow(error) - def write_chunk_error(self, target, obj_meta, + def write_chunk_error(self, target, chunk=None, irreparable=False): if chunk is not None: target = target.copy() target.chunk = chunk - if self.error_file: - self.write_error(target, irreparable=irreparable) + self.write_error(target, irreparable=irreparable) if self.rebuild_file: - self.write_rebuilder_input(target, obj_meta, + self.write_rebuilder_input(target, irreparable=irreparable) def _check_chunk_xattr(self, target, obj_meta, xattr_meta): @@ -274,7 +277,8 @@ def _check_chunk(self, target): xattr_meta = None try: - xattr_meta = self.blob_client.chunk_head(chunk, xattr=self.full) + xattr_meta = self.api.blob_client.chunk_head(chunk, + xattr=self.full) except exc.NotFound as err: self.chunk_not_found += 1 errors.append('Chunk %s not found: %s' % (chunk, str(err))) @@ -309,7 +313,7 @@ def check_chunk(self, target): errors, _obj_meta = self._check_chunk(target) return errors - def _check_metachunk(self, target, obj_meta, stg_met, pos, chunks, + def _check_metachunk(self, target, stg_met, pos, chunks, recurse=False): """ Check that a metachunk has the right number of chunks. @@ -321,8 +325,8 @@ def _check_metachunk(self, target, obj_meta, stg_met, pos, chunks, if len(chunks) < required: missing_chunks = required - len(chunks) - print(' Missing %d chunks at position %s of %s' % ( - missing_chunks, pos, target)) + # print(' Missing %d chunks at position %s of %s' % ( + # missing_chunks, pos, target)) if stg_met.ec: subs = {x['num'] for x in chunks} for sub in range(required): @@ -366,66 +370,132 @@ def _check_obj_policy(self, target, obj_meta, chunks, recurse=False): chunks_by_pos = _sort_chunks(chunks, stg_met.ec) tasks = list() for pos, chunks in chunks_by_pos.iteritems(): - tasks.append(self.pool.spawn( + tasks.append((pos, self.pool.spawn( self._check_metachunk, - target.copy(), obj_meta, stg_met, pos, chunks, - recurse=recurse)) + target.copy(), stg_met, pos, chunks, + recurse=recurse))) errors = list() - for task in tasks: - errors.extend(task.wait()) + for pos, task in tasks: + try: + errors.extend(task.wait()) + except Exception as err: + errors.append("Check failed: pos %d: %s" % (pos, err)) return errors + def check_obj_versions(self, target, versions, recurse=False): + """ + Run checks of all versions of the targeted object in parallel. + """ + tasks = list() + for ov in versions: + tcopy = target.copy_object() + tcopy.content_id = ov['id'] + tcopy.version = str(ov['version']) + tasks.append((tcopy.version, + self.pool.spawn(self.check_obj, + tcopy, + recurse=recurse))) + errors = list() + for version, task in tasks: + try: + task.wait() + except Exception as err: + errors.append("Check failed: version %s: %s" % (version, err)) + if errors: + # Send a result with the target without version to tell + # we were not able to check all versions of the object. + self.send_result(target, errors) + + def _load_obj_meta(self, target, errors): + """ + Load object metadata and chunks. + + :param target: which object to check. + :param errors: list of errors that will be appended + in case any error occurs. + :returns: a tuple with object metadata and a list of chunks. + """ + try: + return self.api.object_locate( + target.account, target.container, target.obj, + version=target.version, properties=False) + except exc.NoSuchObject as err: + self.object_not_found += 1 + errors.append('Object %s/%s not found: %s' % ( + target.obj, target.version, str(err))) + except Exception as err: + self.object_exceptions += 1 + errors.append('Object %s/%s failed to be checked: %s' % ( + target.obj, target.version, str(err))) + return None, [] + def check_obj(self, target, recurse=False): + """ + Check one object version. + If no version is specified, all versions of the object will be checked. + :returns: the result of the check of the most recent version, + or the one that is explicitly targeted. + """ account = target.account container = target.container obj = target.obj - - if (account, container, obj) in self.running: - self.running[(account, container, obj)].wait() - if (account, container, obj) in self.list_cache: - return self.list_cache[(account, container, obj)] - self.running[(account, container, obj)] = Event() - print('Checking object "%s"' % target) + vers = target.version # can be None + + if (account, container, obj, vers) in self.running: + self.running[(account, container, obj, vers)].wait() + if (account, container, obj, vers) in self.list_cache: + return self.list_cache[(account, container, obj, vers)] + self.running[(account, container, obj, vers)] = Event() + # print('Checking object "%s"' % target) container_listing, _ = self.check_container(target.copy_container()) errors = list() if obj not in container_listing: errors.append('Object %s/%s missing from container listing' % ( - target.obj, target.version)) + obj, vers)) # checksum = None else: + versions = container_listing[obj] + if vers is None: + if target.content_id is None: + # No version specified, check all versions + self.check_obj_versions(target.copy_object(), versions, + recurse=recurse) + # Now return the cached result of the most recent version + target.content_id = versions[0]['id'] + target.version = str(versions[0]['version']) + res = self.check_obj(target, recurse=False) + self.running[(account, container, obj, vers)].send(True) + del self.running[(account, container, obj, vers)] + return res + else: + for ov in versions: + if ov['id'] == target.content_id: + vers = str(ov['version']) + target.version = vers + break + else: + errors.append( + 'Object %s/%s missing from container listing' % ( + obj, target.content_id)) + # TODO check checksum match # checksum = container_listing[obj]['hash'] pass - results = [] - meta = dict() - try: - meta, results = self.container_client.content_locate( - account=account, reference=container, path=obj, - properties=False) - except exc.NotFound as err: - self.object_not_found += 1 - errors.append('Object %s/%s not found: %s' % ( - target.obj, target.version, str(err))) - except Exception as err: - self.object_exceptions += 1 - errors.append('Object %s/%s failed to be checked: %s' % ( - target.obj, target.version, str(err))) - - chunk_listing = dict() - for chunk in results: - chunk_listing[chunk['url']] = chunk + meta, chunks = self._load_obj_meta(target, errors) + chunk_listing = {c['url']: c for c in chunks} if meta: - self.list_cache[(account, container, obj)] = (chunk_listing, meta) + self.list_cache[(account, container, obj, vers)] = \ + (chunk_listing, meta) self.objects_checked += 1 - self.running[(account, container, obj)].send(True) - del self.running[(account, container, obj)] + self.running[(account, container, obj, vers)].send(True) + del self.running[(account, container, obj, vers)] # Skip the check if we could not locate the object if meta: errors.extend( - self._check_obj_policy(target, meta, results, recurse=recurse)) + self._check_obj_policy(target, meta, chunks, recurse=recurse)) self.send_result(target, errors) return chunk_listing, meta @@ -439,7 +509,7 @@ def check_container(self, target, recurse=False): if (account, container) in self.list_cache: return self.list_cache[(account, container)] self.running[(account, container)] = Event() - print('Checking container "%s"' % target) + # print('Checking container "%s"' % target) account_listing = self.check_account(target.copy_account()) errors = list() if container not in account_listing: @@ -453,13 +523,14 @@ def check_container(self, target, recurse=False): extra_args = dict() if self.limit_listings > 1 and target.obj: # When we are explicitly checking one object, start the listing - # where this object is supposed to be, and list only one object. + # where this object is supposed to be. Do not use a limit, + # but an end marker, in order to fetch all versions of the object. extra_args['prefix'] = target.obj - extra_args['limit'] = 1 + extra_args['end_marker'] = target.obj + '\x00' # HACK while True: try: - _, resp = self.container_client.content_list( - account=account, reference=container, marker=marker, + resp = self.api.object_list( + account, container, marker=marker, versions=True, **extra_args) except exc.NotFound as err: self.container_not_found += 1 @@ -472,8 +543,13 @@ def check_container(self, target, recurse=False): target, str(err))) break + if resp.get('truncated', False): + marker = resp['next_marker'] + if resp['objects']: - marker = resp['objects'][-1]['name'] + # safeguard, probably useless + if not marker: + marker = resp['objects'][-1]['name'] results.extend(resp['objects']) if self.limit_listings > 1: break @@ -483,8 +559,11 @@ def check_container(self, target, recurse=False): break container_listing = dict() + # Save all object versions, with the most recent first for obj in results: - container_listing[obj['name']] = obj + container_listing.setdefault(obj['name'], list()).append(obj) + for versions in container_listing.values(): + versions.sort(key=lambda o: o['version'], reverse=True) if self.limit_listings <= 1: # We just listed the whole container, keep the result in a cache @@ -494,10 +573,13 @@ def check_container(self, target, recurse=False): del self.running[(account, container)] if recurse: - for obj in container_listing: - tcopy = target.copy() - tcopy.obj = obj - self.pool.spawn_n(self.check_obj, tcopy, True) + for obj_vers in container_listing.values(): + for obj in obj_vers: + tcopy = target.copy_object() + tcopy.obj = obj['name'] + tcopy.content_id = obj['id'] + tcopy.version = str(obj['version']) + self.pool.spawn_n(self.check_obj, tcopy, True) self.send_result(target, errors) return container_listing, ct_meta @@ -509,7 +591,7 @@ def check_account(self, target, recurse=False): if account in self.list_cache: return self.list_cache[account] self.running[account] = Event() - print('Checking account "%s"' % target) + # print('Checking account "%s"' % target) errors = list() marker = None results = [] @@ -522,7 +604,7 @@ def check_account(self, target, recurse=False): extra_args['limit'] = 1 while True: try: - resp = self.account_client.container_list( + resp = self.api.container_list( account, marker=marker, **extra_args) except Exception as err: self.account_exceptions += 1 @@ -530,17 +612,18 @@ def check_account(self, target, recurse=False): 'Account %s failed to be checked: %s' % ( account, str(err))) break - if resp['listing']: - marker = resp['listing'][-1][0] - results.extend(resp['listing']) + if resp: + marker = resp[-1][0] + results.extend(resp) if self.limit_listings > 0: break else: break containers = dict() - for e in results: - containers[e[0]] = (e[1], e[2]) + for container in results: + # Name, number of objects, number of bytes + containers[container[0]] = (container[1], container[2]) if self.limit_listings <= 0: # We just listed the whole account, keep the result in a cache @@ -568,15 +651,35 @@ def check(self, target): else: self.pool.spawn_n(self.check_account, target, True) - def handle_results(self): + def fetch_results(self): while not self.result_queue.empty(): res = self.result_queue.get(True) yield res - def wait(self): + def log_result(self, result): + if result.errors: + if result.target.type == 'chunk': + # FIXME(FVE): check error criticity + # and set the irreparable flag. + self.write_chunk_error(result.target) + else: + self.write_error(result.target) + + def run(self): + """ + Fetch results and write logs until all jobs have finished. + + :returns: a generator yielding check results. + """ + while self.pool.running() + self.pool.waiting(): + for result in self.fetch_results(): + self.log_result(result) + yield result + sleep(0.1) self.pool.waitall() - # FIXME(FVE): - return self.handle_results() + for result in self.fetch_results(): + self.log_result(result) + yield result def report(self): success = True @@ -621,6 +724,9 @@ def _report_stat(name, stat): def main(): + """ + Main function for legacy integrity crawler. + """ parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('namespace', help='Namespace name') parser.add_argument( @@ -675,9 +781,10 @@ def main(): limit_listings=limit_listings, request_attempts=args.attempts, ) - args = csv.reader(source, delimiter=' ') - for entry in args: + entries = csv.reader(source, delimiter=' ') + for entry in entries: checker.check(Target(*entry)) - checker.wait() + for _ in checker.run(): + pass if not checker.report(): sys.exit(1) diff --git a/tests/functional/blob/test_converter.py b/tests/functional/blob/test_converter.py index 94064ad387..908cf95a31 100644 --- a/tests/functional/blob/test_converter.py +++ b/tests/functional/blob/test_converter.py @@ -115,7 +115,8 @@ def _convert_and_check(self, chunk_volume, chunk_path, checker.check(Target( account, container=container, obj=path, chunk='http://' + converter.volume_id + '/' + chunk_id)) - checker.wait() + for _ in checker.run(): + pass self.assertTrue(checker.report()) if expected_raw_meta: From 145c9bf76d860145b836e79e0ab041d1fcb2b422 Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Thu, 14 Mar 2019 12:08:42 +0100 Subject: [PATCH 5/8] oio-crawler-integrity: fix stdout logging --- oio/crawler/integrity.py | 72 +++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 41 deletions(-) diff --git a/oio/crawler/integrity.py b/oio/crawler/integrity.py index a185a6d8a5..90b878e7f1 100644 --- a/oio/crawler/integrity.py +++ b/oio/crawler/integrity.py @@ -29,6 +29,7 @@ from oio.common import exceptions as exc from oio.common.fullpath import decode_fullpath, decode_old_fullpath +from oio.common.logger import get_logger from oio.common.storage_method import STORAGE_METHODS from oio.api.object_storage import ObjectStorageApi from oio.api.object_storage import _sort_chunks @@ -114,19 +115,20 @@ def health(self): # TODO(FVE): add an intermediate 'warning' level return 'error' if self.errors else 'OK' - def errors_to_str(self, separator='\n'): + def errors_to_str(self, separator='\n', err_format='%s'): """ Pretty print errors stored in this result. """ if not self.errors: return str(None) - return separator.join(str(x) for x in self.errors) + return separator.join(err_format % x for x in self.errors) class Checker(object): def __init__(self, namespace, concurrency=50, error_file=None, rebuild_file=None, full=True, - limit_listings=0, request_attempts=1): + limit_listings=0, request_attempts=1, + logger=None, verbose=False): self.pool = GreenPool(concurrency) self.error_file = error_file self.full = bool(full) @@ -145,8 +147,11 @@ def __init__(self, namespace, concurrency=50, fd = open(self.rebuild_file, 'a') self.rebuild_writer = csv.writer(fd, delimiter='|') + self.logger = logger or get_logger({'namespace': namespace}, + name='integrity', verbose=verbose) self.api = ObjectStorageApi( namespace, + logger=self.logger, max_retries=request_attempts - 1, request_attempts=request_attempts) @@ -249,16 +254,14 @@ def _check_chunk_xattr(self, target, obj_meta, xattr_meta): attr_key = attr_prefix + 'chunk_size' if str(obj_meta['size']) != xattr_meta.get(attr_key): errors.append( - "Chunk %s '%s' xattr (%s) differs from size in meta2 (%s)" % - (target.chunk, attr_key, xattr_meta.get(attr_key), - obj_meta['size'])) + "'%s' xattr (%s) differs from size in meta2 (%s)" % + (attr_key, xattr_meta.get(attr_key), obj_meta['size'])) attr_key = attr_prefix + 'chunk_hash' if obj_meta['hash'] != xattr_meta.get(attr_key): errors.append( - "Chunk %s '%s' xattr (%s) differs from hash in meta2 (%s)" % - (target.chunk, attr_key, xattr_meta.get(attr_key), - obj_meta['hash'])) + "'%s' xattr (%s) differs from hash in meta2 (%s)" % + (attr_key, xattr_meta.get(attr_key), obj_meta['hash'])) return errors def _check_chunk(self, target): @@ -281,14 +284,13 @@ def _check_chunk(self, target): xattr=self.full) except exc.NotFound as err: self.chunk_not_found += 1 - errors.append('Chunk %s not found: %s' % (chunk, str(err))) + errors.append('Not found: %s' % (err, )) except exc.FaultyChunk as err: self.chunk_exceptions += 1 - errors.append('Chunk %s is faulty: %r' % (chunk, err)) + errors.append('Faulty: %r' % (err, )) except Exception as err: self.chunk_exceptions += 1 - errors.append('Chunk %s failed to be checked: %s' % ( - chunk, str(err))) + errors.append('Check failed: %s' % (err, )) if not target.obj and xattr_meta: self.complete_target_from_chunk_metadata(target, xattr_meta) @@ -296,7 +298,7 @@ def _check_chunk(self, target): if target.obj: obj_listing, obj_meta = self.check_obj(target.copy_object()) if chunk not in obj_listing: - errors.append('Chunk %s missing from object listing' % chunk) + errors.append('Missing from object listing') db_meta = dict() else: db_meta = obj_listing[chunk] @@ -325,8 +327,6 @@ def _check_metachunk(self, target, stg_met, pos, chunks, if len(chunks) < required: missing_chunks = required - len(chunks) - # print(' Missing %d chunks at position %s of %s' % ( - # missing_chunks, pos, target)) if stg_met.ec: subs = {x['num'] for x in chunks} for sub in range(required): @@ -421,12 +421,10 @@ def _load_obj_meta(self, target, errors): version=target.version, properties=False) except exc.NoSuchObject as err: self.object_not_found += 1 - errors.append('Object %s/%s not found: %s' % ( - target.obj, target.version, str(err))) + errors.append('Not found: %s' % (err, )) except Exception as err: self.object_exceptions += 1 - errors.append('Object %s/%s failed to be checked: %s' % ( - target.obj, target.version, str(err))) + errors.append('Check failed: %s' % (err, )) return None, [] def check_obj(self, target, recurse=False): @@ -446,12 +444,11 @@ def check_obj(self, target, recurse=False): if (account, container, obj, vers) in self.list_cache: return self.list_cache[(account, container, obj, vers)] self.running[(account, container, obj, vers)] = Event() - # print('Checking object "%s"' % target) + self.logger.info('Checking object "%s"', target) container_listing, _ = self.check_container(target.copy_container()) errors = list() if obj not in container_listing: - errors.append('Object %s/%s missing from container listing' % ( - obj, vers)) + errors.append('Missing from container listing') # checksum = None else: versions = container_listing[obj] @@ -474,9 +471,7 @@ def check_obj(self, target, recurse=False): target.version = vers break else: - errors.append( - 'Object %s/%s missing from container listing' % ( - obj, target.content_id)) + errors.append('Missing from container listing') # TODO check checksum match # checksum = container_listing[obj]['hash'] @@ -509,13 +504,11 @@ def check_container(self, target, recurse=False): if (account, container) in self.list_cache: return self.list_cache[(account, container)] self.running[(account, container)] = Event() - # print('Checking container "%s"' % target) + self.logger.info('Checking container "%s"', target) account_listing = self.check_account(target.copy_account()) errors = list() if container not in account_listing: - errors.append( - 'Container %s missing from account listing' % ( - container)) + errors.append('Missing from account listing') marker = None results = [] @@ -532,15 +525,13 @@ def check_container(self, target, recurse=False): resp = self.api.object_list( account, container, marker=marker, versions=True, **extra_args) - except exc.NotFound as err: + except exc.NoSuchContainer as err: self.container_not_found += 1 - errors.append('Container %s not found: %s' % ( - container, str(err))) + errors.append('Not found: %s' % (err, )) break except Exception as err: self.container_exceptions += 1 - errors.append('Container %s failed to be checked: %s' % ( - target, str(err))) + errors.append('Check failed: %s' % (err, )) break if resp.get('truncated', False): @@ -591,7 +582,7 @@ def check_account(self, target, recurse=False): if account in self.list_cache: return self.list_cache[account] self.running[account] = Event() - # print('Checking account "%s"' % target) + self.logger.info('Checking account "%s"', target) errors = list() marker = None results = [] @@ -608,9 +599,7 @@ def check_account(self, target, recurse=False): account, marker=marker, **extra_args) except Exception as err: self.account_exceptions += 1 - errors.append( - 'Account %s failed to be checked: %s' % ( - account, str(err))) + errors.append('Check failed: %s' % (err, )) break if resp: marker = resp[-1][0] @@ -664,6 +653,8 @@ def log_result(self, result): self.write_chunk_error(result.target) else: self.write_error(result.target) + self.logger.warn('%s:\n%s', result.target, + result.errors_to_str(err_format=' %s')) def run(self): """ @@ -750,8 +741,6 @@ def main(): parser.add_argument('-p', '--presence', action='store_true', default=False, help="Presence check, the xattr check is skipped.") - parser.add_argument('-v', '--verbose', - action='store_true', help='verbose output') parser.add_argument('--concurrency', '--workers', type=int, default=50, help='Number of concurrent checks (default: 50).') @@ -780,6 +769,7 @@ def main(): full=not args.presence, limit_listings=limit_listings, request_attempts=args.attempts, + verbose=True, ) entries = csv.reader(source, delimiter=' ') for entry in entries: From 269fead55ca37cd3641395cd6685abc0cc943113 Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Thu, 14 Mar 2019 12:33:36 +0100 Subject: [PATCH 6/8] Python API: use REQID_HEADER constant everywhere --- oio/account/server.py | 4 ++-- oio/api/base.py | 14 +++++++------- oio/api/ec.py | 7 ++++--- oio/api/io.py | 3 ++- oio/blob/indexer.py | 4 ++-- oio/event/filters/account_update.py | 3 ++- oio/event/filters/content_cleaner.py | 3 ++- oio/event/filters/volume_index.py | 9 +++++---- oio/rdir/client.py | 5 +++-- tests/functional/api/test_objectstorage.py | 5 +++-- .../functional/content/test_content_perfectible.py | 7 ++++--- tests/unit/api/test_directory.py | 5 ++--- tests/unit/api/test_objectstorage.py | 5 +++-- tests/unit/api/test_rdir.py | 2 +- tests/utils.py | 3 ++- 15 files changed, 44 insertions(+), 35 deletions(-) diff --git a/oio/account/server.py b/oio/account/server.py index cc9b9a100a..2b790ebaae 100644 --- a/oio/account/server.py +++ b/oio/account/server.py @@ -20,7 +20,7 @@ from functools import wraps from oio.account.backend import AccountBackend -from oio.common.constants import STRLEN_REQID +from oio.common.constants import REQID_HEADER, STRLEN_REQID from oio.common.json import json from oio.common.logger import get_logger from oio.common.wsgi import WerkzeugApp @@ -35,7 +35,7 @@ def _access_log_wrapper(self, req, *args, **kwargs): pre = time() rc = func(self, req, *args, **kwargs) post = time() - reqid = req.headers.get('X-oio-req-id', '-')[:STRLEN_REQID] + reqid = req.headers.get(REQID_HEADER, '-')[:STRLEN_REQID] # func time size user reqid self.logger.info("%s %0.6f %s %s %s", func.__name__, post - pre, '-', '-', reqid) diff --git a/oio/api/base.py b/oio/api/base.py index 3bde742c5e..cd62803cf3 100644 --- a/oio/api/base.py +++ b/oio/api/base.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -23,7 +23,7 @@ from oio.common.utils import deadline_to_timeout from oio.common.constants import ADMIN_HEADER, \ TIMEOUT_HEADER, PERFDATA_HEADER, FORCEMASTER_HEADER, \ - CONNECTION_TIMEOUT, READ_TIMEOUT, STRLEN_REQID + CONNECTION_TIMEOUT, READ_TIMEOUT, REQID_HEADER, STRLEN_REQID _POOL_MANAGER_OPTIONS_KEYS = ["pool_connections", "pool_maxsize", "max_retries", "backoff_factor"] @@ -146,11 +146,11 @@ def _direct_request(self, method, url, headers=None, data=None, json=None, # Look for a request ID if 'req_id' in kwargs: - out_headers['X-oio-req-id'] = str(kwargs['req_id']) + out_headers[REQID_HEADER] = str(kwargs['req_id']) - if len(out_headers.get('X-oio-req-id', '')) > STRLEN_REQID: - out_headers['X-oio-req-id'] = \ - out_headers['X-oio-req-id'][:STRLEN_REQID] + if len(out_headers.get(REQID_HEADER, '')) > STRLEN_REQID: + out_headers[REQID_HEADER] = \ + out_headers[REQID_HEADER][:STRLEN_REQID] self.__logger().warn('Request ID truncated to %d characters', STRLEN_REQID) @@ -200,7 +200,7 @@ def _direct_request(self, method, url, headers=None, data=None, json=None, perfdata[kv[0]] = pdat except urllib3.exceptions.HTTPError as exc: oio_exception_from_httperror(exc, - reqid=out_headers.get('X-oio-req-id'), + reqid=out_headers.get(REQID_HEADER), url=url) if resp.status >= 400: raise exceptions.from_response(resp, body) diff --git a/oio/api/ec.py b/oio/api/ec.py index d351dc5590..7f36ca5c41 100644 --- a/oio/api/ec.py +++ b/oio/api/ec.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -23,6 +23,7 @@ from socket import error as SocketError from greenlet import GreenletExit from oio.common import exceptions +from oio.common.constants import REQID_HEADER from oio.common.exceptions import SourceReadError from oio.common.http import HeadersDict, parse_content_range, \ ranges_from_http_header, headers_from_object_metadata @@ -561,7 +562,7 @@ def connect(cls, chunk, sysmeta, reqid=None, chunk_path = parsed.path.split('/')[-1] hdrs = headers_from_object_metadata(sysmeta) if reqid: - hdrs['X-oio-req-id'] = reqid + hdrs[REQID_HEADER] = reqid hdrs[CHUNK_HEADERS["chunk_pos"]] = chunk["pos"] hdrs[CHUNK_HEADERS["chunk_id"]] = chunk_path @@ -920,7 +921,7 @@ def stream(self): handler = EcMetachunkWriter( self.sysmeta, meta_chunk, global_checksum, self.storage_method, - reqid=self.headers.get('X-oio-req-id'), + reqid=self.headers.get(REQID_HEADER), connection_timeout=self.connection_timeout, write_timeout=self.write_timeout, read_timeout=self.read_timeout, diff --git a/oio/api/io.py b/oio/api/io.py index c6c87767ac..07f5783e84 100644 --- a/oio/api/io.py +++ b/oio/api/io.py @@ -23,6 +23,7 @@ from urlparse import urlparse from socket import error as SocketError from oio.common import exceptions as exc +from oio.common.constants import REQID_HEADER from oio.common.http import parse_content_type,\ parse_content_range, ranges_from_http_header, http_header_from_ranges from oio.common.http_eventlet import http_connect @@ -229,7 +230,7 @@ def reqid(self): """:returns: the request ID or None""" if not self.request_headers: return None - return self.request_headers.get('X-oio-req-id') + return self.request_headers.get(REQID_HEADER) def recover(self, nb_bytes): """ diff --git a/oio/blob/indexer.py b/oio/blob/indexer.py index 77861aa188..f9ed374e0b 100644 --- a/oio/blob/indexer.py +++ b/oio/blob/indexer.py @@ -23,6 +23,7 @@ from oio.blob.utils import check_volume, read_chunk_metadata from oio.rdir.client import RdirClient +from oio.common.constants import REQID_HEADER from oio.common.daemon import Daemon from oio.common import exceptions as exc from oio.common.constants import STRLEN_CHUNKID, CHUNK_SUFFIX_PENDING @@ -181,8 +182,7 @@ def update_index(self, path, chunk_id): raise exc.FaultyChunk(err) data = {'mtime': int(time.time())} - # TODO(FVE): replace with the improved request_id() function - headers = {'X-oio-req-id': 'blob-indexer-' + request_id()[:-13]} + headers = {REQID_HEADER: request_id('blob-indexer-')} self.index_client.chunk_push(self.volume_id, meta['container_id'], meta['content_id'], diff --git a/oio/event/filters/account_update.py b/oio/event/filters/account_update.py index bfd978bd7c..99566d124b 100644 --- a/oio/event/filters/account_update.py +++ b/oio/event/filters/account_update.py @@ -14,6 +14,7 @@ # along with this program. If not, see . +from oio.common.constants import REQID_HEADER from oio.common.exceptions import ClientException, OioTimeout from oio.common.utils import request_id from oio.account.client import AccountClient @@ -36,7 +37,7 @@ def init(self): def process(self, env, cb): event = Event(env) headers = { - 'X-oio-req-id': event.reqid or request_id('account-update-') + REQID_HEADER: event.reqid or request_id('account-update-') } if event.event_type in CONTAINER_EVENTS: diff --git a/oio/event/filters/content_cleaner.py b/oio/event/filters/content_cleaner.py index 19a9995e15..522fb7801e 100644 --- a/oio/event/filters/content_cleaner.py +++ b/oio/event/filters/content_cleaner.py @@ -15,6 +15,7 @@ from oio.blob.client import BlobClient +from oio.common.constants import REQID_HEADER from oio.event.evob import Event, EventTypes from oio.event.filters.base import Filter from oio.common.exceptions import OioException @@ -38,7 +39,7 @@ def init(self): def _handle_rawx(self, url, chunks, content_headers, storage_method, reqid): cid = url.get('id') - headers = {'X-oio-req-id': reqid, + headers = {REQID_HEADER: reqid, 'Connection': 'close'} resps = self.blob_client.chunk_delete_many( diff --git a/oio/event/filters/volume_index.py b/oio/event/filters/volume_index.py index d4be55f38d..2ead4953ba 100644 --- a/oio/event/filters/volume_index.py +++ b/oio/event/filters/volume_index.py @@ -14,6 +14,7 @@ # along with this program. If not, see . +from oio.common.constants import REQID_HEADER from oio.event.evob import Event, EventError, EventTypes from oio.event.filters.base import Filter from oio.common.exceptions import OioException, VolumeException @@ -30,7 +31,7 @@ class VolumeIndexFilter(Filter): def _chunk_delete(self, reqid, volume_id, container_id, content_id, chunk_id): - headers = {'X-oio-req-id': reqid} + headers = {REQID_HEADER: reqid} try: return self.app.rdir.chunk_delete( volume_id, container_id, content_id, chunk_id, @@ -41,7 +42,7 @@ def _chunk_delete(self, reqid, def _chunk_push(self, reqid, volume_id, container_id, content_id, chunk_id, args): - headers = {'X-oio-req-id': reqid} + headers = {REQID_HEADER: reqid} try: return self.app.rdir.chunk_push( volume_id, container_id, content_id, chunk_id, @@ -55,7 +56,7 @@ def _service_push(self, reqid, type_, self.logger.debug( 'Indexing services of type %s is not supported', type_) return - headers = {'X-oio-req-id': reqid} + headers = {REQID_HEADER: reqid} try: return self.app.rdir.meta2_index_push( volume_id, url, cid, mtime, headers=headers) @@ -69,7 +70,7 @@ def _service_delete(self, reqid, type_, self.logger.debug( 'Indexing services of type %s is not supported', type_) return - headers = {'X-oio-req-id': reqid} + headers = {REQID_HEADER: reqid} try: return self.app.rdir.meta2_index_delete( volume_id, url, cid, headers=headers) diff --git a/oio/rdir/client.py b/oio/rdir/client.py index 3ac09b1451..8e0a229e9b 100644 --- a/oio/rdir/client.py +++ b/oio/rdir/client.py @@ -15,6 +15,7 @@ from oio.api.base import HttpApi +from oio.common.constants import REQID_HEADER from oio.common.exceptions import ClientException, NotFound, VolumeException from oio.common.exceptions import ServiceUnavailable, ServerException from oio.common.exceptions import OioNetworkException, OioException, \ @@ -339,7 +340,7 @@ def _get_rdir_addr(self, volume_id, req_id=None): return self._addr_cache[volume_id] # Not cached, try a direct lookup try: - headers = {'X-oio-req-id': req_id or request_id()} + headers = {REQID_HEADER: req_id or request_id()} resp = self.directory.list(RDIR_ACCT, volume_id, service_type='rdir', headers=headers) @@ -366,7 +367,7 @@ def _rdir_request(self, volume, method, action, create=False, params=None, if create: params['create'] = '1' uri = self._make_uri(action, volume, - req_id=kwargs['headers']['X-oio-req-id'], + req_id=kwargs['headers'][REQID_HEADER], service_type=service_type) try: resp, body = self._direct_request(method, uri, params=params, diff --git a/tests/functional/api/test_objectstorage.py b/tests/functional/api/test_objectstorage.py index 9d6398214b..531cb72244 100644 --- a/tests/functional/api/test_objectstorage.py +++ b/tests/functional/api/test_objectstorage.py @@ -20,8 +20,9 @@ from functools import partial from urllib3 import HTTPResponse from oio.api.object_storage import ObjectStorageApi, _sort_chunks -from oio.common.storage_functions import _sort_chunks as sort_chunks from oio.common import exceptions as exc +from oio.common.constants import REQID_HEADER +from oio.common.storage_functions import _sort_chunks as sort_chunks from oio.common.utils import cid_from_name from oio.common.fullpath import encode_fullpath from oio.common.storage_method import STORAGE_METHODS @@ -89,7 +90,7 @@ def test_container_show(self): self._create(name) # container_show on existing container res = self.api.container_show(self.account, name, - headers={'X-oio-req-id': 'Salut!'}) + headers={REQID_HEADER: 'Salut!'}) self.assertIsNot(res['properties'], None) self._delete(name) diff --git a/tests/functional/content/test_content_perfectible.py b/tests/functional/content/test_content_perfectible.py index 624926eb18..2429ebc7b6 100644 --- a/tests/functional/content/test_content_perfectible.py +++ b/tests/functional/content/test_content_perfectible.py @@ -28,6 +28,7 @@ from tests.utils import BaseTestCase from oio.api.object_storage import ObjectStorageApi +from oio.common.constants import REQID_HEADER from oio.common.utils import request_id from oio.common.json import json from oio.event.beanstalk import ResponseError @@ -111,7 +112,7 @@ def test_0_upload_ok(self): obj_name='perfect', data='whatever', policy='THREECOPIES', - headers={'X-oio-req-id': reqid}) + headers={REQID_HEADER: reqid}) # Wait on the oio-improve beanstalk tube. self.beanstalk.watch(DEFAULT_IMPROVER_TUBE) @@ -141,7 +142,7 @@ def test_upload_warn_dist(self): obj_name='perfectible', data='whatever', policy='THREECOPIES', - headers={'X-oio-req-id': reqid}) + headers={REQID_HEADER: reqid}) # Wait on the oio-improve beanstalk tube. event = self._wait_for_event() @@ -185,7 +186,7 @@ def test_upload_fallback(self): obj_name='perfectible', data='whatever', policy='THREECOPIES', - headers={'X-oio-req-id': reqid}) + headers={REQID_HEADER: reqid}) # Wait on the oio-improve beanstalk tube. event = self._wait_for_event() diff --git a/tests/unit/api/test_directory.py b/tests/unit/api/test_directory.py index 98416b98fd..944d297475 100644 --- a/tests/unit/api/test_directory.py +++ b/tests/unit/api/test_directory.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2017 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2017,2019 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,7 +20,7 @@ from oio.common import exceptions from tests.unit.api import FakeDirectoryClient, FakeApiResponse -from tests.utils import random_id, random_str +from tests.utils import random_str class DirectoryTest(unittest.TestCase): @@ -29,7 +29,6 @@ def setUp(self): self.api = FakeDirectoryClient({'namespace': "NS"}, endpoint=self.endpoint) self.account = "AUTH_test" - self.headers = {"x-req-id": random_id(64)} self.uri_base = "%s/v3.0/NS" % self.endpoint self.name = random_str(64) diff --git a/tests/unit/api/test_objectstorage.py b/tests/unit/api/test_objectstorage.py index cfb114c9e9..e99d72d5cd 100644 --- a/tests/unit/api/test_objectstorage.py +++ b/tests/unit/api/test_objectstorage.py @@ -24,7 +24,8 @@ from oio.common import exceptions -from oio.common.constants import container_headers, object_headers +from oio.common.constants import container_headers, object_headers, \ + REQID_HEADER from oio.common.decorators import handle_container_not_found, \ handle_object_not_found from oio.common.storage_functions import _sort_chunks @@ -49,7 +50,7 @@ def setUp(self): self.api = FakeStorageApi("NS", endpoint=self.fake_endpoint) self.account = "test" self.container = "fake" - self.headers = {"x-oio-req-id": random_str(32)} + self.headers = {REQID_HEADER: random_str(32)} self.policy = "THREECOPIES" self.uri_base = self.fake_endpoint + "/v3.0/NS" diff --git a/tests/unit/api/test_rdir.py b/tests/unit/api/test_rdir.py index 6d7c9f1544..3e6121c78d 100644 --- a/tests/unit/api/test_rdir.py +++ b/tests/unit/api/test_rdir.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2017 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2016-2019 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public diff --git a/tests/utils.py b/tests/utils.py index e317482864..2d7f9287f4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -30,6 +30,7 @@ import testtools from oio.common.configuration import load_namespace_conf, set_namespace_options +from oio.common.constants import REQID_HEADER from oio.common.http_urllib3 import get_pool_manager from oio.common.json import json as jsonlib from oio.common.green import time @@ -97,7 +98,7 @@ def get_config(defaults=None): class CommonTestCase(testtools.TestCase): - TEST_HEADERS = {'X-oio-req-id': '7E571D0000000000'} + TEST_HEADERS = {REQID_HEADER: '7E571D0000000000'} def is_running_on_public_ci(self): from os import getenv From 35b5ffae6c8c039da0ba1088d5604199adb8d7e5 Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Mon, 18 Mar 2019 14:45:29 +0100 Subject: [PATCH 7/8] CLI: hide not-implemented feature, fix chunk attr parsing --- oio/blob/client.py | 39 +++++++++++++++------- oio/cli/admin/item.py | 13 ++++---- oio/common/constants.py | 26 ++++++++++----- oio/common/fullpath.py | 8 ++++- oio/crawler/integrity.py | 43 ++++++++++++++++++------- tests/functional/blob/test_mover.py | 34 +++++++++---------- tests/functional/blob/test_rebuilder.py | 2 +- 7 files changed, 109 insertions(+), 56 deletions(-) diff --git a/oio/blob/client.py b/oio/blob/client.py index ae88577cf7..c973493e2d 100644 --- a/oio/blob/client.py +++ b/oio/blob/client.py @@ -23,7 +23,7 @@ from oio.common.http_urllib3 import get_pool_manager, \ oio_exception_from_httperror, urllib3 from oio.common import exceptions as exc, utils -from oio.common.constants import CHUNK_HEADERS, chunk_xattr_keys_optional, \ +from oio.common.constants import CHUNK_HEADERS, CHUNK_XATTR_KEYS_OPTIONAL, \ FETCHXATTR_HEADER, OIO_VERSION, REQID_HEADER from oio.common.decorators import ensure_headers, ensure_request_id from oio.api.io import ChunkReader @@ -36,19 +36,28 @@ PARALLEL_CHUNKS_DELETE = 3 -def extract_headers_meta(headers): +def extract_headers_meta(headers, check=True): + """ + Extract chunk metadata from a dictionary of rawx response headers. + + :param headers: a dictionary of headers, as returned by a HEAD or GET + request to a rawx service. + :keyword check: if True (the default), raise FaultyChunk if one or + several mandatory response headers are missing. + :returns: a dictionary of chunk metadata. + """ meta = {} missing = list() - for mkey, hkey in CHUNK_HEADERS.iteritems(): + for mkey, hkey in CHUNK_HEADERS.items(): try: if mkey == 'full_path': meta[mkey] = headers[hkey] else: meta[mkey] = unquote(headers[hkey]) except KeyError: - if mkey not in chunk_xattr_keys_optional: + if check and mkey not in CHUNK_XATTR_KEYS_OPTIONAL: missing.append(exc.MissingAttribute(mkey)) - if missing: + if check and missing: raise exc.FaultyChunk(*missing) return meta @@ -141,13 +150,19 @@ def __delete_chunk(chunk_): @update_rawx_perfdata @ensure_headers @ensure_request_id - def chunk_get(self, url, **kwargs): + def chunk_get(self, url, check_headers=True, **kwargs): + """ + :keyword check_headers: when True (the default), raise FaultyChunk + if a mandatory response header is missing. + :returns: a tuple with a dictionary of chunk metadata and a stream + to the chunk's data. + """ url = self.resolve_url(url) reader = ChunkReader([{'url': url}], READ_BUFFER_SIZE, **kwargs) # This must be done now if we want to access headers stream = reader.stream() - headers = extract_headers_meta(reader.headers) + headers = extract_headers_meta(reader.headers, check=check_headers) return headers, stream @update_rawx_perfdata @@ -184,17 +199,19 @@ def chunk_copy(self, from_url, to_url, chunk_id=None, fullpath=None, cid=None, path=None, version=None, content_id=None, **kwargs): stream = None + # Check source headers only when new fullpath is not provided + kwargs['check_headers'] = not bool(fullpath) try: meta, stream = self.chunk_get(from_url, **kwargs) meta['oio_version'] = OIO_VERSION meta['chunk_id'] = chunk_id or to_url.split('/')[-1] meta['full_path'] = fullpath or meta['full_path'] - meta['container_id'] = cid or meta['container_id'] - meta['content_path'] = path or meta['content_path'] + meta['container_id'] = cid or meta.get('container_id') + meta['content_path'] = path or meta.get('content_path') # FIXME: the original keys are the good ones. # ReplicatedMetachunkWriter should be modified to accept them. - meta['version'] = version or meta['content_version'] - meta['id'] = content_id or meta['content_id'] + meta['version'] = version or meta.get('content_version') + meta['id'] = content_id or meta.get('content_id') meta['chunk_method'] = meta['content_chunkmethod'] meta['policy'] = meta['content_policy'] copy_meta = self.chunk_put(to_url, meta, stream, **kwargs) diff --git a/oio/cli/admin/item.py b/oio/cli/admin/item.py index 11b027dceb..b95154190b 100644 --- a/oio/cli/admin/item.py +++ b/oio/cli/admin/item.py @@ -44,12 +44,13 @@ def patch_parser(self, parser): default=1, help="Number of attempts for listing requests (default: 1)." ) - parser.add_argument( - '--checksum', - action='store_true', - help=("Perform checksum comparisons. This requires downloading " - "data from rawx services.") - ) + # TODO(FVE): implement chunk checksums + # parser.add_argument( + # '--checksum', + # action='store_true', + # help=("Perform checksum comparisons. This requires downloading " + # "data from rawx services.") + # ) parser.add_argument( '--depth', type=int, diff --git a/oio/common/constants.py b/oio/common/constants.py index cdeed62422..4e317e26d6 100644 --- a/oio/common/constants.py +++ b/oio/common/constants.py @@ -52,13 +52,12 @@ str(OIO_DB_DISABLED): "Disabled", } -# TODO(FVE): rename constants to upper case -container_headers = { +CONTAINER_HEADERS = { "size": "%ssys-m2-usage" % CONTAINER_METADATA_PREFIX, "ns": "%ssys-ns" % CONTAINER_METADATA_PREFIX } -object_headers = { +OBJECT_HEADERS = { "name": "%sname" % OBJECT_METADATA_PREFIX, "id": "%sid" % OBJECT_METADATA_PREFIX, "policy": "%spolicy" % OBJECT_METADATA_PREFIX, @@ -87,7 +86,7 @@ "oio_version": "%soio-version" % CHUNK_METADATA_PREFIX, } -chunk_xattr_keys = { +CHUNK_XATTR_KEYS = { 'chunk_hash': 'grid.chunk.hash', 'chunk_id': 'grid.chunk.id', 'chunk_pos': 'grid.chunk.position', @@ -105,21 +104,32 @@ CHUNK_XATTR_CONTENT_FULLPATH_PREFIX = 'oio.content.fullpath:' -chunk_xattr_keys_optional = { +CHUNK_XATTR_KEYS_OPTIONAL = { 'content_chunksnb': True, 'chunk_hash': True, 'chunk_size': True, 'metachunk_size': True, 'metachunk_hash': True, 'oio_version': True, - 'full_path': True} - + # Superseded by full_path + 'container_id': True, + 'content_id': True, + 'content_path': True, + 'content_version': True, +} -volume_xattr_keys = { +VOLUME_XATTR_KEYS = { 'namespace': 'server.ns', 'type': 'server.type', 'id': 'server.id'} +# TODO(FVE): remove from versions 5.1+ +chunk_xattr_keys = CHUNK_XATTR_KEYS +chunk_xattr_keys_optional = CHUNK_XATTR_KEYS_OPTIONAL +container_headers = CONTAINER_HEADERS +object_headers = OBJECT_HEADERS +volume_xattr_keys = VOLUME_XATTR_KEYS + # Suffix of chunk file names that have been declared corrupt CHUNK_SUFFIX_CORRUPT = '.corrupt' # Suffix of chunk file names that are not finished being uploaded diff --git a/oio/common/fullpath.py b/oio/common/fullpath.py index 130af6cd0f..6ee277a9de 100644 --- a/oio/common/fullpath.py +++ b/oio/common/fullpath.py @@ -28,9 +28,15 @@ def encode_fullpath(account, container, path, version, content_id): def decode_fullpath(fullpath): + """ + Decode a "fullpath" string, extract its 5 parts. + + :raises: ValueError if the string has invalid format. + :returns: account, container, path, version and content ID. + """ fp = fullpath.split('/') if len(fp) != 5: - raise ValueError("fullpath: Wrong format") + raise ValueError("fullpath: invalid format") decoded = list() for part in fp: decoded.append(unquote(part)) diff --git a/oio/crawler/integrity.py b/oio/crawler/integrity.py index 90b878e7f1..20d2fc9194 100644 --- a/oio/crawler/integrity.py +++ b/oio/crawler/integrity.py @@ -28,7 +28,7 @@ import argparse from oio.common import exceptions as exc -from oio.common.fullpath import decode_fullpath, decode_old_fullpath +from oio.common.fullpath import decode_fullpath from oio.common.logger import get_logger from oio.common.storage_method import STORAGE_METHODS from oio.api.object_storage import ObjectStorageApi @@ -175,22 +175,41 @@ def __init__(self, namespace, concurrency=50, def complete_target_from_chunk_metadata(self, target, xattr_meta): """ Complete a Target object from metadata found in chunk's extended - attributes. + attributes. In case the "fullpath" is not available, try to read + legacy metadata, and maybe ask meta1 to resolve the CID into + account and container names. """ # pylint: disable=unbalanced-tuple-unpacking try: acct, ct, path, vers, content_id = \ decode_fullpath(xattr_meta['full_path']) - except ValueError: - acct, ct, path, vers = \ - decode_old_fullpath(xattr_meta['full_path']) - content_id = None - # TODO(FVE): load old-style metadata - target.account = acct - target.container = ct - target.obj = path - target.content_id = content_id - target.version = vers + target.account = acct + target.container = ct + target.obj = path + target.content_id = content_id + target.version = vers + except KeyError: + # No fullpath header, try legacy headers + if 'content_path' in xattr_meta: + target.obj = xattr_meta['content_path'] + if 'content_id' in xattr_meta: + target.content_id = xattr_meta['content_id'] + if 'content_version' in xattr_meta: + target.version = xattr_meta['content_version'] + cid = xattr_meta.get('container_id') + if cid: + try: + md = self.api.directory.show(cid=cid) + acct = md.get('account') + ct = md.get('name') + if acct: + target.account = acct + if ct: + target.container = ct + except Exception as err: + self.logger.warn("Failed to resolve CID %s into account " + "and container names: %s", + cid, err) def send_result(self, target, errors=None): """ diff --git a/tests/functional/blob/test_mover.py b/tests/functional/blob/test_mover.py index 74d83df0c4..af7b39cd81 100644 --- a/tests/functional/blob/test_mover.py +++ b/tests/functional/blob/test_mover.py @@ -66,22 +66,22 @@ def _chunk_path(self, chunk): return volume + '/' + chunk_id[:3] + '/' + chunk_id def test_move_old_chunk(self): - for c in self.chunks: + for chunk in self.chunks: convert_to_old_chunk( - self._chunk_path(c), self.account, self.container, self.path, - self.version, self.content_id) + self._chunk_path(chunk), self.account, self.container, + self.path, self.version, self.content_id) - chunk = random.choice(self.chunks) - chunk_volume = chunk['url'].split('/')[2] - chunk_id = chunk['url'].split('/')[3] + orig_chunk = random.choice(self.chunks) + chunk_volume = orig_chunk['url'].split('/')[2] + chunk_id = orig_chunk['url'].split('/')[3] chunk_headers, chunk_stream = self.blob_client.chunk_get( - chunk['url']) + orig_chunk['url'], check_headers=False) chunks_kept = list(self.chunks) - chunks_kept.remove(chunk) + chunks_kept.remove(orig_chunk) mover = BlobMoverWorker(self.conf, None, self.rawx_volumes[chunk_volume]) - mover.chunk_move(self._chunk_path(chunk), chunk_id) + mover.chunk_move(self._chunk_path(orig_chunk), chunk_id) _, new_chunks = self.api.object_locate( self.account, self.container, self.path) @@ -90,16 +90,16 @@ def test_move_old_chunk(self): self.assertEqual(len(new_chunks), len(chunks_kept) + 1) url_kept = [c['url'] for c in chunks_kept] new_chunk = None - for c in new_chunks: - if c['url'] not in url_kept: + for chunk in new_chunks: + if chunk['url'] not in url_kept: self.assertIsNone(new_chunk) - new_chunk = c + new_chunk = chunk - self.assertNotEqual(chunk['real_url'], new_chunk['real_url']) - self.assertNotEqual(chunk['url'], new_chunk['url']) - self.assertEqual(chunk['pos'], new_chunk['pos']) - self.assertEqual(chunk['size'], new_chunk['size']) - self.assertEqual(chunk['hash'], new_chunk['hash']) + self.assertNotEqual(orig_chunk['real_url'], new_chunk['real_url']) + self.assertNotEqual(orig_chunk['url'], new_chunk['url']) + self.assertEqual(orig_chunk['pos'], new_chunk['pos']) + self.assertEqual(orig_chunk['size'], new_chunk['size']) + self.assertEqual(orig_chunk['hash'], new_chunk['hash']) new_chunk_headers, new_chunk_stream = self.blob_client.chunk_get( new_chunk['url']) diff --git a/tests/functional/blob/test_rebuilder.py b/tests/functional/blob/test_rebuilder.py index e629f93d4f..e9ad3be675 100644 --- a/tests/functional/blob/test_rebuilder.py +++ b/tests/functional/blob/test_rebuilder.py @@ -78,7 +78,7 @@ def test_rebuild_old_chunk(self): chunk_volume = chunk['url'].split('/')[2] chunk_id = chunk['url'].split('/')[3] chunk_headers, chunk_stream = self.blob_client.chunk_get( - chunk['url']) + chunk['url'], check_headers=False) os.remove(self._chunk_path(chunk)) chunks_kept = list(self.chunks) chunks_kept.remove(chunk) From 0a5a86bbb3f1d79f87b2ad35f9874c510fe8c082 Mon Sep 17 00:00:00 2001 From: "Florent Vennetier (OpenIO)" Date: Mon, 18 Mar 2019 16:36:28 +0100 Subject: [PATCH 8/8] CLI: implement --depth parameter for check commands --- oio/cli/admin/item.py | 26 +++++++++++++++----------- oio/crawler/integrity.py | 32 ++++++++++++++++---------------- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/oio/cli/admin/item.py b/oio/cli/admin/item.py index b95154190b..dd1bcbc5b1 100644 --- a/oio/cli/admin/item.py +++ b/oio/cli/admin/item.py @@ -52,12 +52,13 @@ def patch_parser(self, parser): # "data from rawx services.") # ) parser.add_argument( - '--depth', + '--depth', '--max-depth', type=int, - default=0, + default=5, help=("How deep to recurse. 0 means do not recurse. " - "N > 0 means recurse N levels below the specified item type," - " from namespace to chunk.") + "N > 0 means recurse N levels below the specified item type " + "(namespace -> account -> container -> object -> chunk, " + "default: 5).") ) parser.add_argument( '--concurrency', '--workers', type=int, @@ -111,13 +112,14 @@ def take_action(self, parsed_args): parsed_args.accounts = [self.app.options.account] for acct in parsed_args.accounts: target = Target(acct) - checker.check(target) + checker.check(target, parsed_args.depth) return self.format_results(checker) class ContainerCheck(CheckCommandMixin, lister.Lister): """ - Check a container for problems. + Check a container for problems. Quick checks on the account owning + the container will also be performed. """ base_level = 3 @@ -138,13 +140,14 @@ def take_action(self, parsed_args): checker = self.build_checker(parsed_args) for ct in parsed_args.containers: target = Target(self.app.options.account, ct) - checker.check(target) + checker.check(target, parsed_args.depth) return self.format_results(checker) class ObjectCheck(CheckCommandMixin, lister.Lister): """ - Check an object for problems. + Check an object for problems. Quick checks on the account and the container + owning the object will also be performed. """ base_level = 2 @@ -177,13 +180,14 @@ def take_action(self, parsed_args): for obj in parsed_args.objects: target = Target(self.app.options.account, parsed_args.container, obj, version=parsed_args.object_version) - checker.check(target) + checker.check(target, parsed_args.depth) return self.format_results(checker) class ChunkCheck(CheckCommandMixin, lister.Lister): """ - Check a chunk for problems. + Check a chunk for problems. Quick checks on the account, the container + and the object owning the chunk will also be performed. """ base_level = 0 @@ -204,5 +208,5 @@ def take_action(self, parsed_args): checker = self.build_checker(parsed_args) for chunk in parsed_args.chunks: target = Target(self.app.options.account, chunk=chunk) - checker.check(target) + checker.check(target, parsed_args.depth) return self.format_results(checker) diff --git a/oio/crawler/integrity.py b/oio/crawler/integrity.py index 20d2fc9194..32b4d9eaca 100644 --- a/oio/crawler/integrity.py +++ b/oio/crawler/integrity.py @@ -335,7 +335,7 @@ def check_chunk(self, target): return errors def _check_metachunk(self, target, stg_met, pos, chunks, - recurse=False): + recurse=0): """ Check that a metachunk has the right number of chunks. @@ -356,7 +356,7 @@ def _check_metachunk(self, target, stg_met, pos, chunks, for _ in range(missing_chunks): errors.append("Missing chunk at position %d" % pos) - if recurse: + if recurse > 0: for chunk in chunks: tcopy = target.copy() tcopy.chunk = chunk['url'] @@ -378,7 +378,7 @@ def _check_metachunk(self, target, stg_met, pos, chunks, # be reported as object errors. return errors - def _check_obj_policy(self, target, obj_meta, chunks, recurse=False): + def _check_obj_policy(self, target, obj_meta, chunks, recurse=0): """ Check that the list of chunks of an object matches the object's storage policy. @@ -401,7 +401,7 @@ def _check_obj_policy(self, target, obj_meta, chunks, recurse=False): errors.append("Check failed: pos %d: %s" % (pos, err)) return errors - def check_obj_versions(self, target, versions, recurse=False): + def check_obj_versions(self, target, versions, recurse=0): """ Run checks of all versions of the targeted object in parallel. """ @@ -446,7 +446,7 @@ def _load_obj_meta(self, target, errors): errors.append('Check failed: %s' % (err, )) return None, [] - def check_obj(self, target, recurse=False): + def check_obj(self, target, recurse=0): """ Check one object version. If no version is specified, all versions of the object will be checked. @@ -479,7 +479,7 @@ def check_obj(self, target, recurse=False): # Now return the cached result of the most recent version target.content_id = versions[0]['id'] target.version = str(versions[0]['version']) - res = self.check_obj(target, recurse=False) + res = self.check_obj(target, recurse=0) self.running[(account, container, obj, vers)].send(True) del self.running[(account, container, obj, vers)] return res @@ -514,7 +514,7 @@ def check_obj(self, target, recurse=False): self.send_result(target, errors) return chunk_listing, meta - def check_container(self, target, recurse=False): + def check_container(self, target, recurse=0): account = target.account container = target.container @@ -582,18 +582,18 @@ def check_container(self, target, recurse=False): self.running[(account, container)].send(True) del self.running[(account, container)] - if recurse: + if recurse > 0: for obj_vers in container_listing.values(): for obj in obj_vers: tcopy = target.copy_object() tcopy.obj = obj['name'] tcopy.content_id = obj['id'] tcopy.version = str(obj['version']) - self.pool.spawn_n(self.check_obj, tcopy, True) + self.pool.spawn_n(self.check_obj, tcopy, recurse - 1) self.send_result(target, errors) return container_listing, ct_meta - def check_account(self, target, recurse=False): + def check_account(self, target, recurse=0): account = target.account if account in self.running: @@ -640,24 +640,24 @@ def check_account(self, target, recurse=False): self.running[account].send(True) del self.running[account] - if recurse: + if recurse > 0: for container in containers: tcopy = target.copy_account() tcopy.container = container - self.pool.spawn_n(self.check_container, tcopy, True) + self.pool.spawn_n(self.check_container, tcopy, recurse - 1) self.send_result(target, errors) return containers - def check(self, target): + def check(self, target, recurse=0): if target.type == 'chunk': self.pool.spawn_n(self.check_chunk, target) elif target.type == 'object': - self.pool.spawn_n(self.check_obj, target, True) + self.pool.spawn_n(self.check_obj, target, recurse) elif target.type == 'container': - self.pool.spawn_n(self.check_container, target, True) + self.pool.spawn_n(self.check_container, target, recurse) else: - self.pool.spawn_n(self.check_account, target, True) + self.pool.spawn_n(self.check_account, target, recurse) def fetch_results(self): while not self.result_queue.empty():