Skip to content

Commit

Permalink
Python API: Allow to cache object metadata and location
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Mar 27, 2020
1 parent 35366c4 commit 83e7f88
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 12 deletions.
27 changes: 24 additions & 3 deletions oio/api/object_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -41,6 +41,7 @@
from oio.common.storage_functions import _sort_chunks, fetch_stream, \
fetch_stream_ec
from oio.common.fullpath import encode_fullpath
from oio.common.cache import del_cached_metadata


class ObjectStorageApi(object):
Expand All @@ -61,7 +62,8 @@ class ObjectStorageApi(object):
EXTRA_KEYWORDS = ('chunk_checksum_algo', 'autocreate',
'chunk_buffer_min', 'chunk_buffer_max')

def __init__(self, namespace, logger=None, perfdata=None, **kwargs):
def __init__(self, namespace, logger=None, perfdata=None, cache=None,
**kwargs):
"""
Initialize the object storage API.
Expand Down Expand Up @@ -89,12 +91,15 @@ def __init__(self, namespace, logger=None, perfdata=None, **kwargs):
conf = {"namespace": self.namespace}
self.logger = logger or get_logger(conf)
self.perfdata = perfdata
self.cache = cache
self._global_kwargs = {tok: float_value(tov, None)
for tok, tov in kwargs.items()
if tok in TIMEOUT_KEYS}
self._global_kwargs['autocreate'] = True
if self.perfdata is not None:
self._global_kwargs['perfdata'] = self.perfdata
if self.cache is not None:
self._global_kwargs['cache'] = self.cache
for key in self.__class__.EXTRA_KEYWORDS:
if key in kwargs:
self._global_kwargs[key] = kwargs[key]
Expand Down Expand Up @@ -1067,6 +1072,18 @@ def object_link(self, target_account, target_container, target_obj,
raise
return link_meta

@staticmethod
def _cache_wrapper(account, container, obj, version, stream, **kwargs):
try:
for dat in stream:
yield dat
except exc.UnrecoverableContent:
# The cache may no longer be valid
del_cached_metadata(
account=account, reference=container, path=obj,
cid=kwargs.get('cid'), version=version, **kwargs)
raise

@staticmethod
def _ttfb_wrapper(stream, req_start, download_start, perfdata):
"""Keep track of time-to-first-byte and time-to-last-byte"""
Expand Down Expand Up @@ -1149,8 +1166,12 @@ def object_fetch(self, account, container, obj, version=None, ranges=None,
stream = fetch_stream(chunks, ranges, storage_method, **kwargs)

if perfdata is not None:
return meta, self._ttfb_wrapper(
stream = self._ttfb_wrapper(
stream, req_start, download_start, perfdata)
if kwargs.get('cache'):
stream = self._cache_wrapper(
account, container, obj, version, stream, **kwargs)

return meta, stream

@handle_object_not_found
Expand Down
94 changes: 94 additions & 0 deletions oio/common/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright (C) 2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

from oio.common.utils import cid_from_name


def _get_metadata_cache_key(account=None, reference=None, path=None, cid=None):
if not path:
raise ValueError('Missing object name to use the cache')
cid = cid or cid_from_name(account, reference)
cid = cid.upper()
return '/'.join(("meta", cid, path))


def get_cached_metadata(account=None, reference=None, path=None, cid=None,
version=None, properties=False, cache=None, **kwargs):
"""
Get the object metadata and location of the cache (if there is one)
"""
if cache is None or version:
# Cache isn't compatible with versionning
return None, None

cache_key = _get_metadata_cache_key(
account=account, reference=reference, path=path, cid=cid)
cache_value = cache.get(cache_key)
if cache_value is None:
return None, None

content_meta = cache_value.get('meta')
if content_meta is None:
return None, None
if properties:
content_properties = cache_value.get('properties')
if content_properties is None:
return None, None
content_meta = content_meta.copy()
content_meta['properties'] = content_properties
content_chunks = cache_value.get('chunks')
return content_meta, content_chunks


def set_cached_metadata(content_meta, content_chunks,
account=None, reference=None, path=None, cid=None,
version=None, properties=False, cache=None, **kwargs):
"""
Set the object metadata and location in the cache (if there is one)
"""
if cache is None or version:
# Cache isn't compatible with versionning
return

if content_meta is None:
return
cache_value = dict()
content_meta = content_meta.copy()
if properties:
cache_value['properties'] = content_meta['properties']
content_meta['properties'] = dict()
cache_value['meta'] = content_meta
if content_chunks is not None:
cache_value['chunks'] = content_chunks

cache_key = _get_metadata_cache_key(
account=account, reference=reference, path=path, cid=cid)
cache[cache_key] = cache_value


def del_cached_metadata(account=None, reference=None, path=None, cid=None,
version=None, cache=None, **kwargs):
"""
Delete the object metadata and location of the cache (if there is one)
"""
if cache is None:
return

cache_key = _get_metadata_cache_key(
account=account, reference=reference, path=path, cid=cid)
try:
del cache[cache_key]
except KeyError:
pass
11 changes: 10 additions & 1 deletion oio/common/storage_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,16 @@ def fetch_stream_ec(chunks, ranges, storage_method, **kwargs):
handler = ECChunkDownloadHandler(
storage_method, chunks[pos],
meta_start, meta_end, **kwargs)
stream = handler.get_stream()
try:
stream = handler.get_stream()
except exc.NotFound as err:
raise exc.UnrecoverableContent(
"Cannot download position %d: %s" %
(pos, err))
except Exception as err:
raise exc.ServiceUnavailable(
"Error while downloading position %d: %s" %
(pos, err))
try:
for part_info in stream:
for dat in part_info['iter']:
Expand Down
69 changes: 65 additions & 4 deletions oio/container/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -20,6 +20,8 @@
from oio.common.decorators import ensure_headers
from oio.common.json import json
from oio.common import exceptions
from oio.common.cache import get_cached_metadata, set_cached_metadata, \
del_cached_metadata
from oio.common.easy_value import boolean_value


Expand Down Expand Up @@ -521,6 +523,10 @@ def content_create(self, account=None, reference=None, path=None,
hdrs['x-oio-content-meta-mime-type'] = mime_type
if chunk_method is not None:
hdrs['x-oio-content-meta-chunk-method'] = chunk_method

del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, version=version, **kwargs)

resp, body = self._direct_request(
'POST', uri, data=data, params=params, headers=hdrs, **kwargs)
return resp, body
Expand All @@ -530,6 +536,10 @@ def content_drain(self, account=None, reference=None, path=None, cid=None,
uri = self._make_uri('content/drain')
params = self._make_params(account, reference, path, cid=cid,
version=version)

del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, version=version, **kwargs)

resp, _ = self._direct_request('POST', uri, params=params, **kwargs)
return resp.status == 204

Expand All @@ -543,6 +553,10 @@ def content_delete(self, account=None, reference=None, path=None, cid=None,
uri = self._make_uri('content/delete')
params = self._make_params(account, reference, path, cid=cid,
version=version)

del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, version=version, **kwargs)

resp, _ = self._direct_request('POST', uri,
params=params, **kwargs)
return resp.status == 204
Expand All @@ -564,6 +578,11 @@ def content_delete_many(self, account=None, reference=None, paths=None,
unformatted_data.append({'name': obj})
data = json.dumps({"contents": unformatted_data})
results = list()

for path in paths:
del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, **kwargs)

try:
_, resp_body = self._direct_request(
'POST', uri, data=data, params=params, **kwargs)
Expand Down Expand Up @@ -610,6 +629,12 @@ def content_locate(self, account=None, reference=None, path=None, cid=None,
:returns: a tuple with content metadata `dict` as first element
and chunk `list` as second element
"""
content_meta, chunks = get_cached_metadata(
account=account, reference=reference, path=path,
cid=cid, version=version, properties=properties, **kwargs)
if content_meta is not None and chunks is not None:
return content_meta, chunks

uri = self._make_uri('content/locate')
params['properties'] = properties
try:
Expand All @@ -629,6 +654,12 @@ def content_locate(self, account=None, reference=None, path=None, cid=None,
version=version, **kwargs)
else:
raise

set_cached_metadata(
content_meta, chunks,
account=account, reference=reference, path=path,
cid=cid, version=version, properties=properties, **kwargs)

return content_meta, chunks

@extract_reference_params
Expand Down Expand Up @@ -667,12 +698,24 @@ def content_get_properties(
"""
Get a description of the content along with its user properties.
"""
obj_meta, _ = get_cached_metadata(
account=account, reference=reference, path=path,
cid=cid, version=version, properties=True, **kwargs)
if obj_meta is not None:
return obj_meta

uri = self._make_uri('content/get_properties')
data = json.dumps(properties) if properties else None
resp, body = self._direct_request(
'POST', uri, data=data, params=params, **kwargs)
obj_meta = extract_content_headers_meta(resp.headers)
obj_meta.update(body)

set_cached_metadata(
obj_meta, None,
account=account, reference=reference, path=path,
cid=cid, version=version, properties=True, **kwargs)

return obj_meta

def content_set_properties(self, account=None, reference=None, path=None,
Expand All @@ -689,6 +732,10 @@ def content_set_properties(self, account=None, reference=None, path=None,
if clear:
params['flush'] = 1
data = json.dumps(properties)

del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, version=version, **kwargs)

_resp, _body = self._direct_request(
'POST', uri, data=data, params=params, **kwargs)

Expand All @@ -706,14 +753,19 @@ def content_del_properties(self, account=None, reference=None, path=None,
params = self._make_params(account, reference, path,
cid=cid, version=version)
data = json.dumps(properties)

del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, version=version, **kwargs)

resp, _body = self._direct_request(
'POST', uri, data=data, params=params, **kwargs)
return resp.status == 204

def content_touch(self, account=None, reference=None, path=None, cid=None,
version=None, **kwargs):
uri = self._make_uri('content/touch')
params = self._make_params(account, reference, path, version=version)
params = self._make_params(account, reference, path,
cid=cid, version=version)
self._direct_request('POST', uri, params=params, **kwargs)

@extract_reference_params
Expand All @@ -728,10 +780,15 @@ def content_spare(self, account=None, reference=None, path=None, data=None,
return body

def content_truncate(self, account=None, reference=None, path=None,
cid=None, size=0, **kwargs):
cid=None, version=None, size=0, **kwargs):
uri = self._make_uri('content/truncate')
params = self._make_params(account, reference, path, cid=cid)
params = self._make_params(account, reference, path,
cid=cid, version=version)
params['size'] = size

del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, version=version, **kwargs)

_resp, body = self._direct_request(
'POST', uri, params=params, **kwargs)
return body
Expand All @@ -742,4 +799,8 @@ def content_purge(self, account=None, reference=None, path=None, cid=None,
params = self._make_params(account, reference, path, cid=cid)
if maxvers is not None:
params["maxvers"] = maxvers

del_cached_metadata(account=account, reference=reference,
path=path, cid=cid, **kwargs)

self._direct_request('POST', uri, params=params, **kwargs)
Loading

0 comments on commit 83e7f88

Please # to comment.