From 3e4f9f16e687d41c597e71a2c53e8f1983dd5726 Mon Sep 17 00:00:00 2001 From: Aymeric Ducroquetz Date: Wed, 25 Mar 2020 21:55:07 +0100 Subject: [PATCH] Python API: Allow to cache object metadata and location --- oio/api/object_storage.py | 27 ++- oio/common/storage_functions.py | 11 +- oio/container/client.py | 69 +++++++- tests/functional/api/test_objectstorage.py | 186 ++++++++++++++++++++- 4 files changed, 281 insertions(+), 12 deletions(-) diff --git a/oio/api/object_storage.py b/oio/api/object_storage.py index 6c4e2d9876..bacecf3b0a 100644 --- a/oio/api/object_storage.py +++ b/oio/api/object_storage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -41,6 +41,7 @@ from oio.common.storage_functions import _sort_chunks, fetch_stream, \ fetch_stream_ec from oio.common.fullpath import encode_fullpath +from oio.common.cache import del_metadata_cache class ObjectStorageApi(object): @@ -61,7 +62,8 @@ class ObjectStorageApi(object): EXTRA_KEYWORDS = ('chunk_checksum_algo', 'autocreate', 'chunk_buffer_min', 'chunk_buffer_max') - def __init__(self, namespace, logger=None, perfdata=None, **kwargs): + def __init__(self, namespace, logger=None, perfdata=None, cache=None, + **kwargs): """ Initialize the object storage API. @@ -89,12 +91,15 @@ def __init__(self, namespace, logger=None, perfdata=None, **kwargs): conf = {"namespace": self.namespace} self.logger = logger or get_logger(conf) self.perfdata = perfdata + self.cache = cache self._global_kwargs = {tok: float_value(tov, None) for tok, tov in kwargs.items() if tok in TIMEOUT_KEYS} self._global_kwargs['autocreate'] = True if self.perfdata is not None: self._global_kwargs['perfdata'] = self.perfdata + if self.cache is not None: + self._global_kwargs['cache'] = self.cache for key in self.__class__.EXTRA_KEYWORDS: if key in kwargs: self._global_kwargs[key] = kwargs[key] @@ -1067,6 +1072,18 @@ def object_link(self, target_account, target_container, target_obj, raise return link_meta + @staticmethod + def _cache_wrapper(account, container, obj, version, stream, **kwargs): + try: + for dat in stream: + yield dat + except exc.UnrecoverableContent: + # The cache may no longer be valid + del_metadata_cache( + account=account, reference=container, path=obj, + cid=kwargs.get('cid'), version=version, **kwargs) + raise + @staticmethod def _ttfb_wrapper(stream, req_start, download_start, perfdata): """Keep track of time-to-first-byte and time-to-last-byte""" @@ -1149,8 +1166,12 @@ def object_fetch(self, account, container, obj, version=None, ranges=None, stream = fetch_stream(chunks, ranges, storage_method, **kwargs) if perfdata is not None: - return meta, self._ttfb_wrapper( + stream = self._ttfb_wrapper( stream, req_start, download_start, perfdata) + if kwargs.get('cache'): + stream = self._cache_wrapper( + account, container, obj, version, stream, **kwargs) + return meta, stream @handle_object_not_found diff --git a/oio/common/storage_functions.py b/oio/common/storage_functions.py index 1fade8c0a6..71f5957f38 100644 --- a/oio/common/storage_functions.py +++ b/oio/common/storage_functions.py @@ -235,7 +235,16 @@ def fetch_stream_ec(chunks, ranges, storage_method, **kwargs): handler = ECChunkDownloadHandler( storage_method, chunks[pos], meta_start, meta_end, **kwargs) - stream = handler.get_stream() + try: + stream = handler.get_stream() + except exc.NotFound as err: + raise exc.UnrecoverableContent( + "Cannot download position %d: %s" % + (pos, err)) + except Exception as err: + raise exc.ServiceUnavailable( + "Error while downloading position %d: %s" % + (pos, err)) try: for part_info in stream: for dat in part_info['iter']: diff --git a/oio/container/client.py b/oio/container/client.py index f7e40b70c3..88be63bde7 100644 --- a/oio/container/client.py +++ b/oio/container/client.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,6 +20,8 @@ from oio.common.decorators import ensure_headers from oio.common.json import json from oio.common import exceptions +from oio.common.cache import get_metadata_cache, set_metadata_cache, \ + del_metadata_cache from oio.common.easy_value import boolean_value @@ -521,6 +523,10 @@ def content_create(self, account=None, reference=None, path=None, hdrs['x-oio-content-meta-mime-type'] = mime_type if chunk_method is not None: hdrs['x-oio-content-meta-chunk-method'] = chunk_method + + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, body = self._direct_request( 'POST', uri, data=data, params=params, headers=hdrs, **kwargs) return resp, body @@ -530,6 +536,10 @@ def content_drain(self, account=None, reference=None, path=None, cid=None, uri = self._make_uri('content/drain') params = self._make_params(account, reference, path, cid=cid, version=version) + + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, _ = self._direct_request('POST', uri, params=params, **kwargs) return resp.status == 204 @@ -543,6 +553,10 @@ def content_delete(self, account=None, reference=None, path=None, cid=None, uri = self._make_uri('content/delete') params = self._make_params(account, reference, path, cid=cid, version=version) + + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, _ = self._direct_request('POST', uri, params=params, **kwargs) return resp.status == 204 @@ -564,6 +578,11 @@ def content_delete_many(self, account=None, reference=None, paths=None, unformatted_data.append({'name': obj}) data = json.dumps({"contents": unformatted_data}) results = list() + + for path in paths: + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, **kwargs) + try: _, resp_body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) @@ -610,6 +629,12 @@ def content_locate(self, account=None, reference=None, path=None, cid=None, :returns: a tuple with content metadata `dict` as first element and chunk `list` as second element """ + content_meta, chunks = get_metadata_cache( + account=account, reference=reference, path=path, + cid=cid, version=version, properties=properties, **kwargs) + if content_meta is not None and chunks is not None: + return content_meta, chunks + uri = self._make_uri('content/locate') params['properties'] = properties try: @@ -629,6 +654,12 @@ def content_locate(self, account=None, reference=None, path=None, cid=None, version=version, **kwargs) else: raise + + set_metadata_cache( + content_meta, chunks, + account=account, reference=reference, path=path, + cid=cid, version=version, properties=properties, **kwargs) + return content_meta, chunks @extract_reference_params @@ -667,12 +698,24 @@ def content_get_properties( """ Get a description of the content along with its user properties. """ + obj_meta, _ = get_metadata_cache( + account=account, reference=reference, path=path, + cid=cid, version=version, properties=True, **kwargs) + if obj_meta is not None: + return obj_meta + uri = self._make_uri('content/get_properties') data = json.dumps(properties) if properties else None resp, body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) obj_meta = extract_content_headers_meta(resp.headers) obj_meta.update(body) + + set_metadata_cache( + obj_meta, None, + account=account, reference=reference, path=path, + cid=cid, version=version, properties=True, **kwargs) + return obj_meta def content_set_properties(self, account=None, reference=None, path=None, @@ -689,6 +732,10 @@ def content_set_properties(self, account=None, reference=None, path=None, if clear: params['flush'] = 1 data = json.dumps(properties) + + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + _resp, _body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) @@ -706,6 +753,10 @@ def content_del_properties(self, account=None, reference=None, path=None, params = self._make_params(account, reference, path, cid=cid, version=version) data = json.dumps(properties) + + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + resp, _body = self._direct_request( 'POST', uri, data=data, params=params, **kwargs) return resp.status == 204 @@ -713,7 +764,8 @@ def content_del_properties(self, account=None, reference=None, path=None, def content_touch(self, account=None, reference=None, path=None, cid=None, version=None, **kwargs): uri = self._make_uri('content/touch') - params = self._make_params(account, reference, path, version=version) + params = self._make_params(account, reference, path, + cid=cid, version=version) self._direct_request('POST', uri, params=params, **kwargs) @extract_reference_params @@ -728,10 +780,15 @@ def content_spare(self, account=None, reference=None, path=None, data=None, return body def content_truncate(self, account=None, reference=None, path=None, - cid=None, size=0, **kwargs): + cid=None, version=None, size=0, **kwargs): uri = self._make_uri('content/truncate') - params = self._make_params(account, reference, path, cid=cid) + params = self._make_params(account, reference, path, + cid=cid, version=version) params['size'] = size + + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, version=version, **kwargs) + _resp, body = self._direct_request( 'POST', uri, params=params, **kwargs) return body @@ -742,4 +799,8 @@ def content_purge(self, account=None, reference=None, path=None, cid=None, params = self._make_params(account, reference, path, cid=cid) if maxvers is not None: params["maxvers"] = maxvers + + del_metadata_cache(account=account, reference=reference, + path=path, cid=cid, **kwargs) + self._direct_request('POST', uri, params=params, **kwargs) diff --git a/tests/functional/api/test_objectstorage.py b/tests/functional/api/test_objectstorage.py index 5efea80d50..d506690f11 100644 --- a/tests/functional/api/test_objectstorage.py +++ b/tests/functional/api/test_objectstorage.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2019 OpenIO SAS, as part of OpenIO SDS +# Copyright (C) 2016-2020 OpenIO SAS, as part of OpenIO SDS # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -14,6 +14,7 @@ # License along with this library. +import copy import logging import time from mock import MagicMock as Mock @@ -1371,13 +1372,15 @@ def test_container_snapshot(self): _, chunks_copies = self.api.object_locate(self.account, snapshot, test_object % i) - for chunk, copy in zip(sorted(chunks), sorted(chunks_copies)): + for chunk, chunk_copy in zip(sorted(chunks), + sorted(chunks_copies)): # check that every chunk is different from the target - self.assertNotEqual(chunk['url'], copy['url']) + self.assertNotEqual(chunk['url'], chunk_copy['url']) # check the metadata meta = self.api._blob_client.chunk_head(chunk['url']) - meta_copies = self.api._blob_client.chunk_head(copy['url']) + meta_copies = self.api._blob_client.chunk_head( + chunk_copy['url']) fullpath = encode_fullpath( self.account, snapshot, test_object % i, @@ -1821,3 +1824,178 @@ def test_object_create_ext(self): props = self.api.object_get_properties(self.account, name, name) self.assertEqual(metadata['version'], props['version']) self.assertEqual(int(props['length']), size) + + +class TestObjectStorageApiUsingCache(ObjectStorageApiTestBase): + + def setUp(self): + super(TestObjectStorageApiUsingCache, self).setUp() + self.cache = dict() + self.api = ObjectStorageApi(self.ns, endpoint=self.uri, + cache=self.cache) + + self.container = random_str(16) + self.path = random_str(16) + self.api.object_create(self.account, self.container, + obj_name=self.path, data='cache') + self.created.append((self.container, self.path)) + self.assertEqual(0, len(self.cache)) + + self.api.container._direct_request = Mock( + side_effect=self.api.container._direct_request) + + def tearDown(self): + super(TestObjectStorageApiUsingCache, self).tearDown() + self.assertEqual(0, len(self.cache)) + + def test_object_properties(self): + expected_obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertIsNotNone(expected_obj_meta) + expected_obj_meta = expected_obj_meta.copy() + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + properties = {'test1': '1', 'test2': '2'} + self.api.object_set_properties( + self.account, self.container, self.path, properties) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + expected_obj_meta['properties'] = properties + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + self.api.object_del_properties( + self.account, self.container, self.path, properties.keys()) + self.assertEqual(4, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + expected_obj_meta['properties'] = dict() + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(5, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(5, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + def test_object_locate(self): + properties = {'test1': '1', 'test2': '2'} + self.api.object_set_properties( + self.account, self.container, self.path, properties) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + expected_obj_meta, expected_chunks = self.api.object_locate( + self.account, self.container, self.path, properties=False) + self.assertIsNotNone(expected_obj_meta) + self.assertIsNotNone(expected_chunks) + expected_obj_meta = expected_obj_meta.copy() + expected_chunks = copy.deepcopy(expected_chunks) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=False) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + expected_obj_meta['properties'] = properties + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=True) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=True) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + expected_obj_meta['properties'] = dict() + obj_meta, chunks = self.api.object_locate( + self.account, self.container, self.path, properties=False) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertListEqual(expected_chunks, chunks) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + expected_obj_meta['properties'] = properties + obj_meta = self.api.object_get_properties( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + def test_object_fetch(self): + expected_obj_meta, stream = self.api.object_fetch( + self.account, self.container, self.path) + self.assertIsNotNone(expected_obj_meta) + data = '' + for chunk in stream: + data += chunk + self.assertEqual('cache', data) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + obj_meta, stream = self.api.object_fetch( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + data = '' + for chunk in stream: + data += chunk + self.assertEqual('cache', data) + self.assertEqual(1, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + + # Make the cache invalid + self.api.object_delete( + self.account, self.container, self.path, cache=None) + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(1, len(self.cache)) + self.wait_for_event( + 'oio-preserved', types=[EventTypes.CONTENT_DELETED]) + + obj_meta, stream = self.api.object_fetch( + self.account, self.container, self.path) + self.assertDictEqual(expected_obj_meta, obj_meta) + try: + data = '' + for chunk in stream: + data += chunk + self.fail('This should not happen with the deleted chunks') + except exc.UnrecoverableContent: + pass + self.assertEqual(2, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache)) + + self.assertRaises( + exc.NoSuchObject, self.api.object_fetch, + self.account, self.container, self.path) + self.assertEqual(3, self.api.container._direct_request.call_count) + self.assertEqual(0, len(self.cache))