Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

CLI: implement 'openio-admin <item_type> check' commands #1697

Merged
merged 8 commits into from
Mar 19, 2019
4 changes: 2 additions & 2 deletions oio/account/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from functools import wraps

from oio.account.backend import AccountBackend
from oio.common.constants import STRLEN_REQID
from oio.common.constants import REQID_HEADER, STRLEN_REQID
from oio.common.json import json
from oio.common.logger import get_logger
from oio.common.wsgi import WerkzeugApp
Expand All @@ -35,7 +35,7 @@ def _access_log_wrapper(self, req, *args, **kwargs):
pre = time()
rc = func(self, req, *args, **kwargs)
post = time()
reqid = req.headers.get('X-oio-req-id', '-')[:STRLEN_REQID]
reqid = req.headers.get(REQID_HEADER, '-')[:STRLEN_REQID]
# func time size user reqid
self.logger.info("%s %0.6f %s %s %s",
func.__name__, post - pre, '-', '-', reqid)
Expand Down
14 changes: 7 additions & 7 deletions oio/api/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -23,7 +23,7 @@
from oio.common.utils import deadline_to_timeout
from oio.common.constants import ADMIN_HEADER, \
TIMEOUT_HEADER, PERFDATA_HEADER, FORCEMASTER_HEADER, \
CONNECTION_TIMEOUT, READ_TIMEOUT, STRLEN_REQID
CONNECTION_TIMEOUT, READ_TIMEOUT, REQID_HEADER, STRLEN_REQID

_POOL_MANAGER_OPTIONS_KEYS = ["pool_connections", "pool_maxsize",
"max_retries", "backoff_factor"]
Expand Down Expand Up @@ -146,11 +146,11 @@ def _direct_request(self, method, url, headers=None, data=None, json=None,

# Look for a request ID
if 'req_id' in kwargs:
out_headers['X-oio-req-id'] = str(kwargs['req_id'])
out_headers[REQID_HEADER] = str(kwargs['req_id'])

if len(out_headers.get('X-oio-req-id', '')) > STRLEN_REQID:
out_headers['X-oio-req-id'] = \
out_headers['X-oio-req-id'][:STRLEN_REQID]
if len(out_headers.get(REQID_HEADER, '')) > STRLEN_REQID:
out_headers[REQID_HEADER] = \
out_headers[REQID_HEADER][:STRLEN_REQID]
self.__logger().warn('Request ID truncated to %d characters',
STRLEN_REQID)

Expand Down Expand Up @@ -200,7 +200,7 @@ def _direct_request(self, method, url, headers=None, data=None, json=None,
perfdata[kv[0]] = pdat
except urllib3.exceptions.HTTPError as exc:
oio_exception_from_httperror(exc,
reqid=out_headers.get('X-oio-req-id'),
reqid=out_headers.get(REQID_HEADER),
url=url)
if resp.status >= 400:
raise exceptions.from_response(resp, body)
Expand Down
7 changes: 4 additions & 3 deletions oio/api/ec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -23,6 +23,7 @@
from socket import error as SocketError
from greenlet import GreenletExit
from oio.common import exceptions
from oio.common.constants import REQID_HEADER
from oio.common.exceptions import SourceReadError
from oio.common.http import HeadersDict, parse_content_range, \
ranges_from_http_header, headers_from_object_metadata
Expand Down Expand Up @@ -561,7 +562,7 @@ def connect(cls, chunk, sysmeta, reqid=None,
chunk_path = parsed.path.split('/')[-1]
hdrs = headers_from_object_metadata(sysmeta)
if reqid:
hdrs['X-oio-req-id'] = reqid
hdrs[REQID_HEADER] = reqid

hdrs[CHUNK_HEADERS["chunk_pos"]] = chunk["pos"]
hdrs[CHUNK_HEADERS["chunk_id"]] = chunk_path
Expand Down Expand Up @@ -920,7 +921,7 @@ def stream(self):
handler = EcMetachunkWriter(
self.sysmeta, meta_chunk,
global_checksum, self.storage_method,
reqid=self.headers.get('X-oio-req-id'),
reqid=self.headers.get(REQID_HEADER),
connection_timeout=self.connection_timeout,
write_timeout=self.write_timeout,
read_timeout=self.read_timeout,
Expand Down
3 changes: 2 additions & 1 deletion oio/api/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from urlparse import urlparse
from socket import error as SocketError
from oio.common import exceptions as exc
from oio.common.constants import REQID_HEADER
from oio.common.http import parse_content_type,\
parse_content_range, ranges_from_http_header, http_header_from_ranges
from oio.common.http_eventlet import http_connect
Expand Down Expand Up @@ -229,7 +230,7 @@ def reqid(self):
""":returns: the request ID or None"""
if not self.request_headers:
return None
return self.request_headers.get('X-oio-req-id')
return self.request_headers.get(REQID_HEADER)

def recover(self, nb_bytes):
"""
Expand Down
53 changes: 39 additions & 14 deletions oio/blob/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from oio.common.http_urllib3 import get_pool_manager, \
oio_exception_from_httperror, urllib3
from oio.common import exceptions as exc, utils
from oio.common.constants import CHUNK_HEADERS, chunk_xattr_keys_optional, \
HEADER_PREFIX, OIO_VERSION
from oio.common.constants import CHUNK_HEADERS, CHUNK_XATTR_KEYS_OPTIONAL, \
FETCHXATTR_HEADER, OIO_VERSION, REQID_HEADER
from oio.common.decorators import ensure_headers, ensure_request_id
from oio.api.io import ChunkReader
from oio.api.replication import ReplicatedMetachunkWriter, FakeChecksum
Expand All @@ -36,19 +36,28 @@
PARALLEL_CHUNKS_DELETE = 3


def extract_headers_meta(headers):
def extract_headers_meta(headers, check=True):
"""
Extract chunk metadata from a dictionary of rawx response headers.

:param headers: a dictionary of headers, as returned by a HEAD or GET
request to a rawx service.
:keyword check: if True (the default), raise FaultyChunk if one or
several mandatory response headers are missing.
:returns: a dictionary of chunk metadata.
"""
meta = {}
missing = list()
for mkey, hkey in CHUNK_HEADERS.iteritems():
for mkey, hkey in CHUNK_HEADERS.items():
try:
if mkey == 'full_path':
meta[mkey] = headers[hkey]
else:
meta[mkey] = unquote(headers[hkey])
except KeyError:
if mkey not in chunk_xattr_keys_optional:
if check and mkey not in CHUNK_XATTR_KEYS_OPTIONAL:
missing.append(exc.MissingAttribute(mkey))
if missing:
if check and missing:
raise exc.FaultyChunk(*missing)
return meta

Expand Down Expand Up @@ -141,27 +150,41 @@ def __delete_chunk(chunk_):
@update_rawx_perfdata
@ensure_headers
@ensure_request_id
def chunk_get(self, url, **kwargs):
def chunk_get(self, url, check_headers=True, **kwargs):
"""
:keyword check_headers: when True (the default), raise FaultyChunk
if a mandatory response header is missing.
:returns: a tuple with a dictionary of chunk metadata and a stream
to the chunk's data.
"""
url = self.resolve_url(url)
reader = ChunkReader([{'url': url}], READ_BUFFER_SIZE,
**kwargs)
# This must be done now if we want to access headers
stream = reader.stream()
headers = extract_headers_meta(reader.headers)
headers = extract_headers_meta(reader.headers, check=check_headers)
return headers, stream

@update_rawx_perfdata
@ensure_request_id
def chunk_head(self, url, **kwargs):
"""
Perform a HEAD request on a chunk.

:param url: URL of the chunk to request.
:keyword xattr: when False, ask the rawx not to read
extended attributes of the chunk.
:returns: a `dict` with chunk metadata (empty when xattr is False).
"""
_xattr = bool(kwargs.get('xattr', True))
url = self.resolve_url(url)
headers = kwargs['headers'].copy()
headers[HEADER_PREFIX + 'xattr'] = _xattr
headers[FETCHXATTR_HEADER] = _xattr
try:
resp = self.http_pool.request(
'HEAD', url, headers=headers)
except urllib3.exceptions.HTTPError as ex:
oio_exception_from_httperror(ex, reqid=headers['X-oio-req-id'],
oio_exception_from_httperror(ex, reqid=headers[REQID_HEADER],
url=url)
if resp.status == 200:
if not _xattr:
Expand All @@ -176,17 +199,19 @@ def chunk_copy(self, from_url, to_url, chunk_id=None, fullpath=None,
cid=None, path=None, version=None, content_id=None,
**kwargs):
stream = None
# Check source headers only when new fullpath is not provided
kwargs['check_headers'] = not bool(fullpath)
try:
meta, stream = self.chunk_get(from_url, **kwargs)
meta['oio_version'] = OIO_VERSION
meta['chunk_id'] = chunk_id or to_url.split('/')[-1]
meta['full_path'] = fullpath or meta['full_path']
meta['container_id'] = cid or meta['container_id']
meta['content_path'] = path or meta['content_path']
meta['container_id'] = cid or meta.get('container_id')
meta['content_path'] = path or meta.get('content_path')
# FIXME: the original keys are the good ones.
# ReplicatedMetachunkWriter should be modified to accept them.
meta['version'] = version or meta['content_version']
meta['id'] = content_id or meta['content_id']
meta['version'] = version or meta.get('content_version')
meta['id'] = content_id or meta.get('content_id')
meta['chunk_method'] = meta['content_chunkmethod']
meta['policy'] = meta['content_policy']
copy_meta = self.chunk_put(to_url, meta, stream, **kwargs)
Expand Down
4 changes: 2 additions & 2 deletions oio/blob/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from oio.blob.utils import check_volume, read_chunk_metadata
from oio.rdir.client import RdirClient
from oio.common.constants import REQID_HEADER
from oio.common.daemon import Daemon
from oio.common import exceptions as exc
from oio.common.constants import STRLEN_CHUNKID, CHUNK_SUFFIX_PENDING
Expand Down Expand Up @@ -181,8 +182,7 @@ def update_index(self, path, chunk_id):
raise exc.FaultyChunk(err)

data = {'mtime': int(time.time())}
# TODO(FVE): replace with the improved request_id() function
headers = {'X-oio-req-id': 'blob-indexer-' + request_id()[:-13]}
headers = {REQID_HEADER: request_id('blob-indexer-')}
self.index_client.chunk_push(self.volume_id,
meta['container_id'],
meta['content_id'],
Expand Down
Loading