diff --git a/oio/api/object_storage.py b/oio/api/object_storage.py index 6c4e2d9876..b1af20994d 100644 --- a/oio/api/object_storage.py +++ b/oio/api/object_storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -41,6 +41,7 @@ from oio.common.storage_functions import _sort_chunks, fetch_stream, \ fetch_stream_ec from oio.common.fullpath import encode_fullpath +from oio.common.cache import del_cached_metadata class ObjectStorageApi(object): @@ -61,7 +62,8 @@ class ObjectStorageApi(object): EXTRA_KEYWORDS = ('chunk_checksum_algo', 'autocreate', 'chunk_buffer_min', 'chunk_buffer_max') - def __init__(self, namespace, logger=None, perfdata=None, **kwargs): + def __init__(self, namespace, logger=None, perfdata=None, cache=None, + **kwargs): """ Initialize the object storage API. @@ -89,12 +91,15 @@ def __init__(self, namespace, logger=None, perfdata=None, **kwargs): conf = {"namespace": self.namespace} self.logger = logger or get_logger(conf) self.perfdata = perfdata + self.cache = cache self._global_kwargs = {tok: float_value(tov, None) for tok, tov in kwargs.items() if tok in TIMEOUT_KEYS} self._global_kwargs['autocreate'] = True if self.perfdata is not None: self._global_kwargs['perfdata'] = self.perfdata + if self.cache is not None: + self._global_kwargs['cache'] = self.cache for key in self.__class__.EXTRA_KEYWORDS: if key in kwargs: self._global_kwargs[key] = kwargs[key] @@ -1067,6 +1072,18 @@ def object_link(self, target_account, target_container, target_obj, raise return link_meta + @staticmethod + def _cache_wrapper(account, container, obj, version, stream, **kwargs): + try: + for dat in stream: + yield dat + except exc.UnrecoverableContent: + # The cache may no longer be valid + del_cached_metadata( + account=account, reference=container, path=obj, + cid=kwargs.get('cid'), version=version, **kwargs) + raise + @staticmethod def _ttfb_wrapper(stream, req_start, download_start, perfdata): """Keep track of time-to-first-byte and time-to-last-byte""" @@ -1149,8 +1166,12 @@ def object_fetch(self, account, container, obj, version=None, ranges=None, stream = fetch_stream(chunks, ranges, storage_method, **kwargs) if perfdata is not None: - return meta, self._ttfb_wrapper( + stream = self._ttfb_wrapper( stream, req_start, download_start, perfdata) + if kwargs.get('cache'): + stream = self._cache_wrapper( + account, container, obj, version, stream, **kwargs) + return meta, stream @handle_object_not_found diff --git a/oio/common/cache.py b/oio/common/cache.py new file mode 100644 index 0000000000..c00696b368 --- /dev/null +++ b/oio/common/cache.py @@ -0,0 +1,94 @@ +# Copyright (C) 2020 OpenIO SAS, as part of OpenIO SDS +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3.0 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. + +from oio.common.utils import cid_from_name + + +def _get_metadata_cache_key(account=None, reference=None, path=None, cid=None): + if not path: + raise ValueError('Missing object name to use the cache') + cid = cid or cid_from_name(account, reference) + cid = cid.upper() + return '/'.join(("meta", cid, path)) + + +def get_cached_metadata(account=None, reference=None, path=None, cid=None, + version=None, properties=False, cache=None, **kwargs): + """ + Get the object metadata and location from the cache (if there is one) + """ + if cache is None or version: + # Cache isn't compatible with versionning + return None, None + + cache_key = _get_metadata_cache_key( + account=account, reference=reference, path=path, cid=cid) + cache_value = cache.get(cache_key) + if cache_value is None: + return None, None + + content_meta = cache_value.get('meta') + if content_meta is None: + return None, None + if properties: + content_properties = cache_value.get('properties') + if content_properties is None: + return None, None + content_meta = content_meta.copy() + content_meta['properties'] = content_properties + content_chunks = cache_value.get('chunks') + return content_meta, content_chunks + + +def set_cached_metadata(content_meta, content_chunks, + account=None, reference=None, path=None, cid=None, + version=None, properties=False, cache=None, **kwargs): + """ + Set the object metadata and location in the cache (if there is one) + """ + if cache is None or version: + # Cache isn't compatible with versionning + return + + if content_meta is None: + return + cache_value = dict() + content_meta = content_meta.copy() + if properties: + cache_value['properties'] = content_meta['properties'] + content_meta['properties'] = dict() + cache_value['meta'] = content_meta + if content_chunks is not None: + cache_value['chunks'] = content_chunks + + cache_key = _get_metadata_cache_key( + account=account, reference=reference, path=path, cid=cid) + cache[cache_key] = cache_value + + +def del_cached_metadata(account=None, reference=None, path=None, cid=None, + version=None, cache=None, **kwargs): + """ + Delete the object metadata and location from the cache (if there is one) + """ + if cache is None: + return + + cache_key = _get_metadata_cache_key( + account=account, reference=reference, path=path, cid=cid) + try: + del cache[cache_key] + except KeyError: + pass diff --git a/oio/common/storage_functions.py b/oio/common/storage_functions.py index 1fade8c0a6..71f5957f38 100644 --- a/oio/common/storage_functions.py +++ b/oio/common/storage_functions.py @@ -235,7 +235,16 @@ def fetch_stream_ec(chunks, ranges, storage_method, **kwargs): handler = ECChunkDownloadHandler( storage_method, chunks[pos], meta_start, meta_end, **kwargs) - stream = handler.get_stream() + try: + stream = handler.get_stream() + except exc.NotFound as err: + raise exc.UnrecoverableContent( + "Cannot download position %d: %s" % + (pos, err)) + except Exception as err: + raise exc.ServiceUnavailable( + "Error while downloading position %d: %s" % + (pos, err)) try: for part_info in stream: for dat in part_info['iter']: diff --git a/oio/container/client.py b/oio/container/client.py index f7e40b70c3..1d8bcd7b0e 100644 --- a/oio/container/client.py +++ b/oio/container/client.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,6 +20,8 @@ from oio.common.decorators import ensure_headers from oio.common.json import json from oio.common import exceptions +from oio.common.cache import get_cached_metadata, set_cached_metadata, \ + del_cached_metadata from oio.common.easy_value import boolean_value @@ -521,6 +523,10 @@ def content_create(self, account=None, reference=None, path=None, hdrs['x-oio-content-meta-mime-type'] = mime_type if chunk_method is not None: hdrs['x-oio-content-meta-chunk-method'] = chunk_method + + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, body = self._direct_request( 'POST', uri, data=data, params=params, headers=hdrs, **kwargs) return resp, body @@ -530,6 +536,10 @@ def content_drain(self, account=None, reference=None, path=None, cid=None, uri = self._make_uri('content/drain') params = self._make_params(account, reference, path, cid=cid, version=version) + + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, _ = self._direct_request('POST', uri, params=params, **kwargs) return resp.status == 204 @@ -543,6 +553,10 @@ def content_delete(self, account=None, reference=None, path=None, cid=None, uri = self._make_uri('content/delete') params = self._make_params(account, reference, path, cid=cid, version=version) + + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, _ = self._direct_request('POST', uri, params=params, **kwargs) return resp.status == 204 @@ -564,6 +578,11 @@ def content_delete_many(self, account=None, reference=None, paths=None, unformatted_data.append({'name': obj}) data = json.dumps({"contents": unformatted_data}) results = list() + + for path in paths: + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, **kwargs) + try: _, resp_body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) @@ -610,6 +629,12 @@ def content_locate(self, account=None, reference=None, path=None, cid=None, :returns: a tuple with content metadata `dict` as first element and chunk `list` as second element """ + content_meta, chunks = get_cached_metadata( + account=account, reference=reference, path=path, + cid=cid, version=version, properties=properties, **kwargs) + if content_meta is not None and chunks is not None: + return content_meta, chunks + uri = self._make_uri('content/locate') params['properties'] = properties try: @@ -629,6 +654,12 @@ def content_locate(self, account=None, reference=None, path=None, cid=None, version=version, **kwargs) else: raise + + set_cached_metadata( + content_meta, chunks, + account=account, reference=reference, path=path, + cid=cid, version=version, properties=properties, **kwargs) + return content_meta, chunks @extract_reference_params @@ -667,12 +698,24 @@ def content_get_properties( """ Get a description of the content along with its user properties. """ + obj_meta, _ = get_cached_metadata( + account=account, reference=reference, path=path, + cid=cid, version=version, properties=True, **kwargs) + if obj_meta is not None: + return obj_meta + uri = self._make_uri('content/get_properties') data = json.dumps(properties) if properties else None resp, body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) obj_meta = extract_content_headers_meta(resp.headers) obj_meta.update(body) + + set_cached_metadata( + obj_meta, None, + account=account, reference=reference, path=path, + cid=cid, version=version, properties=True, **kwargs) + return obj_meta def content_set_properties(self, account=None, reference=None, path=None, @@ -689,6 +732,10 @@ def content_set_properties(self, account=None, reference=None, path=None, if clear: params['flush'] = 1 data = json.dumps(properties) + + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + _resp, _body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) @@ -706,6 +753,10 @@ def content_del_properties(self, account=None, reference=None, path=None, params = self._make_params(account, reference, path, cid=cid, version=version) data = json.dumps(properties) + + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, _body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) return resp.status == 204 @@ -713,7 +764,8 @@ def content_del_properties(self, account=None, reference=None, path=None, def content_touch(self, account=None, reference=None, path=None, cid=None, version=None, **kwargs): uri = self._make_uri('content/touch') - params = self._make_params(account, reference, path, version=version) + params = self._make_params(account, reference, path, + cid=cid, version=version) self._direct_request('POST', uri, params=params, **kwargs) @extract_reference_params @@ -728,10 +780,15 @@ def content_spare(self, account=None, reference=None, path=None, data=None, return body def content_truncate(self, account=None, reference=None, path=None, - cid=None, size=0, **kwargs): + cid=None, version=None, size=0, **kwargs): uri = self._make_uri('content/truncate') - params = self._make_params(account, reference, path, cid=cid) + params = self._make_params(account, reference, path, + cid=cid, version=version) params['size'] = size + + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + _resp, body = self._direct_request( 'POST', uri, params=params, **kwargs) return body @@ -742,4 +799,8 @@ def content_purge(self, account=None, reference=None, path=None, cid=None, params = self._make_params(account, reference, path, cid=cid) if maxvers is not None: params["maxvers"] = maxvers + + del_cached_metadata(account=account, reference=reference, + path=path, cid=cid, **kwargs) + self._direct_request('POST', uri, params=params, **kwargs) diff --git a/tests/functional/api/test_objectstorage.py b/tests/functional/api/test_objectstorage.py index 5efea80d50..5f0f11ed18 100644 --- a/tests/functional/api/test_objectstorage.py +++ b/tests/functional/api/test_objectstorage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2016-2020 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -14,6 +14,7 @@ # License along with this library. +import copy import logging import time from mock import MagicMock as Mock @@ -1371,13 +1372,15 @@ def test_container_snapshot(self): _, chunks_copies = self.api.object_locate(self.account, snapshot, test_object % i) - for chunk, copy in zip(sorted(chunks), sorted(chunks_copies)): + for chunk, chunk_copy in zip(sorted(chunks), + sorted(chunks_copies)): # check that every chunk is different from the target - self.assertNotEqual(chunk['url'], copy['url']) + self.assertNotEqual(chunk['url'], chunk_copy['url']) # check the metadata meta = self.api._blob_client.chunk_head(chunk['url']) - meta_copies = self.api._blob_client.chunk_head(copy['url']) + meta_copies = self.api._blob_client.chunk_head( + chunk_copy['url']) fullpath = encode_fullpath( self.account, snapshot, test_object % i, @@ -1821,3 +1824,178 @@ def test_object_create_ext(self): props = self.api.object_get_properties(self.account, name, name) self.assertEqual(metadata['version'], props['version']) self.assertEqual(int(props['length']), size) + + +class TestObjectStorageApiUsingCache(ObjectStorageApiTestBase): + + def setUp(self): + super(TestObjectStorageApiUsingCache, self).setUp() + self.cache = dict() + self.api = ObjectStorageApi(self.ns, endpoint=self.uri, + cache=self.cache) + + self.container = random_str(8) + self.path = random_str(8) + self.api.object_create(self.account, self.container, + obj_name=self.path, data='cache') + self.created.append((self.container, self.path)) + self.assertEqual(0, len(self.cache)) + + self.api.container._direct_request = Mock( + side_effect=self.api.container._direct_request) + + def tearDown(self): + super(TestObjectStorageApiUsingCache, self).tearDown() + self.assertEqual(0, len(self.cache)) + + def test_object_properties(self): + expected_obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertIsNotNone(expected_obj_meta) + expected_obj_meta = expected_obj_meta.copy() + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + properties = {'test1': '1', 'test2': '2'} + self.api.object_set_properties( + self.account, self.container, self.path, properties) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + expected_obj_meta['properties'] = properties + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + self.api.object_del_properties( + self.account, self.container, self.path, properties.keys()) + self.assertEqual(4, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + expected_obj_meta['properties'] = dict() + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(5, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(5, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + def test_object_locate(self): + properties = {'test1': '1', 'test2': '2'} + self.api.object_set_properties( + self.account, self.container, self.path, properties) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + expected_obj_meta, expected_chunks = self.api.object_locate( + self.account, self.container, self.path, properties=False) + self.assertIsNotNone(expected_obj_meta) + self.assertIsNotNone(expected_chunks) + expected_obj_meta = expected_obj_meta.copy() + expected_chunks = copy.deepcopy(expected_chunks) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=False) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + expected_obj_meta['properties'] = properties + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=True) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=True) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + expected_obj_meta['properties'] = dict() + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=False) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + expected_obj_meta['properties'] = properties + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + def test_object_fetch(self): + expected_obj_meta, stream = self.api.object_fetch( + self.account, self.container, self.path) + self.assertIsNotNone(expected_obj_meta) + data = b'' + for chunk in stream: + data += chunk + self.assertEqual('cache', data) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta, stream = self.api.object_fetch( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + data = b'' + for chunk in stream: + data += chunk + self.assertEqual('cache', data) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + # Make the cache invalid + self.api.object_delete( + self.account, self.container, self.path, cache=None) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + self.wait_for_event( + 'oio-preserved', types=[EventTypes.CONTENT_DELETED]) + + obj_meta, stream = self.api.object_fetch( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + try: + data = b'' + for chunk in stream: + data += chunk + self.fail('This should not happen with the deleted chunks') + except exc.UnrecoverableContent: + pass + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + self.assertRaises( + exc.NoSuchObject, self.api.object_fetch, + self.account, self.container, self.path) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache))