From 7b3306b1b6f6871662fcd5ae0d80e3e3847890a2 Mon Sep 17 00:00:00 2001 From: lr4d Date: Thu, 4 Feb 2021 11:17:33 +0100 Subject: [PATCH 01/11] add retries for ParquetSerializer.restore_dataframe Co-authored-by: Nefta Kanilmaz --- CHANGES.rst | 1 + kartothek/serialization/_parquet.py | 210 ++++++++++++++++++---------- 2 files changed, 134 insertions(+), 77 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index e4448868..adccdbfe 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,7 @@ Version 3.19.0 (2021-02-XY) state after the update * Expose compression type and row group chunk size in Cube interface via optional parameter of type :class:`~kartothek.serialization.ParquetSerializer`. +* Add retries to :func:`~kartothek.serialization.parquet.ParquetSerializer.restore_dataframe` Version 3.18.0 (2021-01-25) =========================== diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 9404c7a4..056dc574 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -6,6 +6,8 @@ import datetime +import logging +import time from typing import Iterable, Optional import numpy as np @@ -33,8 +35,12 @@ except ImportError: HAVE_BOTO = False +_logger = logging.getLogger(__name__) + EPOCH_ORDINAL = datetime.date(1970, 1, 1).toordinal() +MAX_NB_RETRIES = 6 # longest retry backoff = BACKOFF_TIME * 2**(MAX_NB_RETRIES - 2) +BACKOFF_TIME = 0.01 # 10 ms def _empty_table_from_schema(parquet_file): @@ -65,6 +71,14 @@ def _reset_dictionary_columns(table, exclude=None): return table +class ParquetReadError(IOError): + """ + Internal kartothek error while attempting to read Parquet file + """ + + pass + + class ParquetSerializer(DataFrameSerializer): _PARQUET_VERSION = "2.0" type_stable = True @@ -107,85 +121,39 @@ def restore_dataframe( categories: Optional[Iterable[str]] = None, predicates: Optional[PredicatesType] = None, date_as_object: bool = False, - ): - check_predicates(predicates) - # If we want to do columnar access we can benefit from partial reads - # otherwise full read en block is the better option. - if (not predicate_pushdown_to_io) or (columns is None and predicates is None): - with pa.BufferReader(store.get(key)) as reader: - table = pq.read_pandas(reader, columns=columns) - else: - if HAVE_BOTO and isinstance(store, BotoStore): - # Parquet and seeks on S3 currently leak connections thus - # we omit column projection to the store. - reader = pa.BufferReader(store.get(key)) - else: - reader = store.open(key) - # Buffer at least 4 MB in requests. This is chosen because the default block size of the Azure - # storage client is 4MB. - reader = BlockBuffer(reader, 4 * 1024 * 1024) + ) -> pd.DataFrame: + for nb_retry in range(MAX_NB_RETRIES): try: - parquet_file = ParquetFile(reader) - if predicates and parquet_file.metadata.num_rows > 0: - # We need to calculate different predicates for predicate - # pushdown and the later DataFrame filtering. This is required - # e.g. in the case where we have an `in` predicate as this has - # different normalized values. - columns_to_io = _columns_for_pushdown(columns, predicates) - predicates_for_pushdown = _normalize_predicates( - parquet_file, predicates, True - ) - predicates = _normalize_predicates(parquet_file, predicates, False) - tables = _read_row_groups_into_tables( - parquet_file, columns_to_io, predicates_for_pushdown - ) - - if len(tables) == 0: - table = _empty_table_from_schema(parquet_file) - else: - table = pa.concat_tables(tables) - else: - # ARROW-5139 Column projection with empty columns returns a table w/out index - if columns == []: - # Create an arrow table with expected index length. - df = ( - parquet_file.schema.to_arrow_schema() - .empty_table() - .to_pandas(date_as_object=date_as_object) - ) - index = pd.Int64Index( - pd.RangeIndex(start=0, stop=parquet_file.metadata.num_rows) - ) - df = pd.DataFrame(df, index=index) - # convert back to table to keep downstream code untouched by this patch - table = pa.Table.from_pandas(df) - else: - table = pq.read_pandas(reader, columns=columns) - finally: - reader.close() - - if columns is not None: - missing_columns = set(columns) - set(table.schema.names) - if missing_columns: - raise ValueError( - "Columns cannot be found in stored dataframe: {missing}".format( - missing=", ".join(sorted(missing_columns)) - ) + return _restore_dataframe( + store=store, + key=key, + filter_query=filter_query, + columns=columns, + predicate_pushdown_to_io=predicate_pushdown_to_io, + categories=categories, + predicates=predicates, + date_as_object=date_as_object, ) - - table = _reset_dictionary_columns(table, exclude=categories) - df = table.to_pandas(categories=categories, date_as_object=date_as_object) - df.columns = df.columns.map(ensure_unicode_string_type) - if predicates: - df = filter_df_from_predicates( - df, predicates, strict_date_types=date_as_object - ) - else: - df = filter_df(df, filter_query) - if columns is not None: - return df.reindex(columns=columns, copy=False) - else: - return df + except (AssertionError, OSError): + _logger.warning( + msg=( + f"Failed to restore dataframe, attempt {nb_retry} of {MAX_NB_RETRIES} with parameters " + f"key: {key}, filter_query: {filter_query}, columns: {columns}, " + f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " + f"predicates: {predicates}, date_as_object: {date_as_object}." + ), + exc_info=True, + ) + # we don't sleep when we're done with the last attempt + if nb_retry < (MAX_NB_RETRIES - 1): + time.sleep(BACKOFF_TIME * 2 ** nb_retry) + + raise ParquetReadError( + "Failed to restore dataframe after {MAX_NB_RETRIES} attempts. Parameters: " + f"key: {key}, filter_query: {filter_query}, columns: {columns}, " + f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " + f"predicates: {predicates}, date_as_object: {date_as_object}." + ) def store(self, store, key_prefix, df): key = "{}.parquet".format(key_prefix) @@ -207,6 +175,94 @@ def store(self, store, key_prefix, df): return key +def _restore_dataframe( + store: KeyValueStore, + key: str, + filter_query: Optional[str] = None, + columns: Optional[Iterable[str]] = None, + predicate_pushdown_to_io: bool = True, + categories: Optional[Iterable[str]] = None, + predicates: Optional[PredicatesType] = None, + date_as_object: bool = False, +) -> pd.DataFrame: + check_predicates(predicates) + # If we want to do columnar access we can benefit from partial reads + # otherwise full read en block is the better option. + if (not predicate_pushdown_to_io) or (columns is None and predicates is None): + with pa.BufferReader(store.get(key)) as reader: + table = pq.read_pandas(reader, columns=columns) + else: + if HAVE_BOTO and isinstance(store, BotoStore): + # Parquet and seeks on S3 currently leak connections thus + # we omit column projection to the store. + reader = pa.BufferReader(store.get(key)) + else: + reader = store.open(key) + # Buffer at least 4 MB in requests. This is chosen because the default block size of the Azure + # storage client is 4MB. + reader = BlockBuffer(reader, 4 * 1024 * 1024) + try: + parquet_file = ParquetFile(reader) + if predicates and parquet_file.metadata.num_rows > 0: + # We need to calculate different predicates for predicate + # pushdown and the later DataFrame filtering. This is required + # e.g. in the case where we have an `in` predicate as this has + # different normalized values. + columns_to_io = _columns_for_pushdown(columns, predicates) + predicates_for_pushdown = _normalize_predicates( + parquet_file, predicates, True + ) + predicates = _normalize_predicates(parquet_file, predicates, False) + tables = _read_row_groups_into_tables( + parquet_file, columns_to_io, predicates_for_pushdown + ) + + if len(tables) == 0: + table = _empty_table_from_schema(parquet_file) + else: + table = pa.concat_tables(tables) + else: + # ARROW-5139 Column projection with empty columns returns a table w/out index + if columns == []: + # Create an arrow table with expected index length. + df = ( + parquet_file.schema.to_arrow_schema() + .empty_table() + .to_pandas(date_as_object=date_as_object) + ) + index = pd.Int64Index( + pd.RangeIndex(start=0, stop=parquet_file.metadata.num_rows) + ) + df = pd.DataFrame(df, index=index) + # convert back to table to keep downstream code untouched by this patch + table = pa.Table.from_pandas(df) + else: + table = pq.read_pandas(reader, columns=columns) + finally: + reader.close() + + if columns is not None: + missing_columns = set(columns) - set(table.schema.names) + if missing_columns: + raise ValueError( + "Columns cannot be found in stored dataframe: {missing}".format( + missing=", ".join(sorted(missing_columns)) + ) + ) + + table = _reset_dictionary_columns(table, exclude=categories) + df = table.to_pandas(categories=categories, date_as_object=date_as_object) + df.columns = df.columns.map(ensure_unicode_string_type) + if predicates: + df = filter_df_from_predicates(df, predicates, strict_date_types=date_as_object) + else: + df = filter_df(df, filter_query) + if columns is not None: + return df.reindex(columns=columns, copy=False) + else: + return df + + def _columns_for_pushdown(columns, predicates): if columns is None: return From 863063179180997e854efc75fedc2ffcd84c9e75 Mon Sep 17 00:00:00 2001 From: Lucas Rademaker <44430780+lr4d@users.noreply.github.com> Date: Thu, 4 Feb 2021 18:16:35 +0100 Subject: [PATCH 02/11] Update kartothek/serialization/_parquet.py --- kartothek/serialization/_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 056dc574..bda192e0 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -152,7 +152,7 @@ def restore_dataframe( "Failed to restore dataframe after {MAX_NB_RETRIES} attempts. Parameters: " f"key: {key}, filter_query: {filter_query}, columns: {columns}, " f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " - f"predicates: {predicates}, date_as_object: {date_as_object}." + f"date_as_object: {date_as_object}, predicates: {predicates}." ) def store(self, store, key_prefix, df): From 23ca257482db2aecefbe9a6a343576772d1196e8 Mon Sep 17 00:00:00 2001 From: Lucas Rademaker <44430780+lr4d@users.noreply.github.com> Date: Thu, 4 Feb 2021 18:16:43 +0100 Subject: [PATCH 03/11] Update kartothek/serialization/_parquet.py --- kartothek/serialization/_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index bda192e0..4e42bcb4 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -137,7 +137,7 @@ def restore_dataframe( except (AssertionError, OSError): _logger.warning( msg=( - f"Failed to restore dataframe, attempt {nb_retry} of {MAX_NB_RETRIES} with parameters " + f"Failed to restore dataframe, attempt {nb_retry + 1} of {MAX_NB_RETRIES} with parameters " f"key: {key}, filter_query: {filter_query}, columns: {columns}, " f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " f"predicates: {predicates}, date_as_object: {date_as_object}." From 4302e3a9cccdcd0f168580ff42071517ecd6a47e Mon Sep 17 00:00:00 2001 From: Lucas Rademaker <44430780+lr4d@users.noreply.github.com> Date: Thu, 4 Feb 2021 18:16:49 +0100 Subject: [PATCH 04/11] Update kartothek/serialization/_parquet.py --- kartothek/serialization/_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 4e42bcb4..45433f67 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -149,7 +149,7 @@ def restore_dataframe( time.sleep(BACKOFF_TIME * 2 ** nb_retry) raise ParquetReadError( - "Failed to restore dataframe after {MAX_NB_RETRIES} attempts. Parameters: " + f"Failed to restore dataframe after {MAX_NB_RETRIES} attempts. Parameters: " f"key: {key}, filter_query: {filter_query}, columns: {columns}, " f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " f"date_as_object: {date_as_object}, predicates: {predicates}." From d8592b59ebd1a38969e9ec184e5325027d823c39 Mon Sep 17 00:00:00 2001 From: lr4d Date: Thu, 4 Feb 2021 18:19:37 +0100 Subject: [PATCH 05/11] make _restore_dataframe a staticmethod --- kartothek/serialization/_parquet.py | 181 ++++++++++++++-------------- 1 file changed, 92 insertions(+), 89 deletions(-) diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 45433f67..707d4bd1 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -112,7 +112,98 @@ def __repr__(self): ) @staticmethod + def _restore_dataframe( + store: KeyValueStore, + key: str, + filter_query: Optional[str] = None, + columns: Optional[Iterable[str]] = None, + predicate_pushdown_to_io: bool = True, + categories: Optional[Iterable[str]] = None, + predicates: Optional[PredicatesType] = None, + date_as_object: bool = False, + ) -> pd.DataFrame: + check_predicates(predicates) + # If we want to do columnar access we can benefit from partial reads + # otherwise full read en block is the better option. + if (not predicate_pushdown_to_io) or (columns is None and predicates is None): + with pa.BufferReader(store.get(key)) as reader: + table = pq.read_pandas(reader, columns=columns) + else: + if HAVE_BOTO and isinstance(store, BotoStore): + # Parquet and seeks on S3 currently leak connections thus + # we omit column projection to the store. + reader = pa.BufferReader(store.get(key)) + else: + reader = store.open(key) + # Buffer at least 4 MB in requests. This is chosen because the default block size of the Azure + # storage client is 4MB. + reader = BlockBuffer(reader, 4 * 1024 * 1024) + try: + parquet_file = ParquetFile(reader) + if predicates and parquet_file.metadata.num_rows > 0: + # We need to calculate different predicates for predicate + # pushdown and the later DataFrame filtering. This is required + # e.g. in the case where we have an `in` predicate as this has + # different normalized values. + columns_to_io = _columns_for_pushdown(columns, predicates) + predicates_for_pushdown = _normalize_predicates( + parquet_file, predicates, True + ) + predicates = _normalize_predicates(parquet_file, predicates, False) + tables = _read_row_groups_into_tables( + parquet_file, columns_to_io, predicates_for_pushdown + ) + + if len(tables) == 0: + table = _empty_table_from_schema(parquet_file) + else: + table = pa.concat_tables(tables) + else: + # ARROW-5139 Column projection with empty columns returns a table w/out index + if columns == []: + # Create an arrow table with expected index length. + df = ( + parquet_file.schema.to_arrow_schema() + .empty_table() + .to_pandas(date_as_object=date_as_object) + ) + index = pd.Int64Index( + pd.RangeIndex(start=0, stop=parquet_file.metadata.num_rows) + ) + df = pd.DataFrame(df, index=index) + # convert back to table to keep downstream code untouched by this patch + table = pa.Table.from_pandas(df) + else: + table = pq.read_pandas(reader, columns=columns) + finally: + reader.close() + + if columns is not None: + missing_columns = set(columns) - set(table.schema.names) + if missing_columns: + raise ValueError( + "Columns cannot be found in stored dataframe: {missing}".format( + missing=", ".join(sorted(missing_columns)) + ) + ) + + table = _reset_dictionary_columns(table, exclude=categories) + df = table.to_pandas(categories=categories, date_as_object=date_as_object) + df.columns = df.columns.map(ensure_unicode_string_type) + if predicates: + df = filter_df_from_predicates( + df, predicates, strict_date_types=date_as_object + ) + else: + df = filter_df(df, filter_query) + if columns is not None: + return df.reindex(columns=columns, copy=False) + else: + return df + + @classmethod def restore_dataframe( + cls, store: KeyValueStore, key: str, filter_query: Optional[str] = None, @@ -124,7 +215,7 @@ def restore_dataframe( ) -> pd.DataFrame: for nb_retry in range(MAX_NB_RETRIES): try: - return _restore_dataframe( + return cls._restore_dataframe( store=store, key=key, filter_query=filter_query, @@ -175,94 +266,6 @@ def store(self, store, key_prefix, df): return key -def _restore_dataframe( - store: KeyValueStore, - key: str, - filter_query: Optional[str] = None, - columns: Optional[Iterable[str]] = None, - predicate_pushdown_to_io: bool = True, - categories: Optional[Iterable[str]] = None, - predicates: Optional[PredicatesType] = None, - date_as_object: bool = False, -) -> pd.DataFrame: - check_predicates(predicates) - # If we want to do columnar access we can benefit from partial reads - # otherwise full read en block is the better option. - if (not predicate_pushdown_to_io) or (columns is None and predicates is None): - with pa.BufferReader(store.get(key)) as reader: - table = pq.read_pandas(reader, columns=columns) - else: - if HAVE_BOTO and isinstance(store, BotoStore): - # Parquet and seeks on S3 currently leak connections thus - # we omit column projection to the store. - reader = pa.BufferReader(store.get(key)) - else: - reader = store.open(key) - # Buffer at least 4 MB in requests. This is chosen because the default block size of the Azure - # storage client is 4MB. - reader = BlockBuffer(reader, 4 * 1024 * 1024) - try: - parquet_file = ParquetFile(reader) - if predicates and parquet_file.metadata.num_rows > 0: - # We need to calculate different predicates for predicate - # pushdown and the later DataFrame filtering. This is required - # e.g. in the case where we have an `in` predicate as this has - # different normalized values. - columns_to_io = _columns_for_pushdown(columns, predicates) - predicates_for_pushdown = _normalize_predicates( - parquet_file, predicates, True - ) - predicates = _normalize_predicates(parquet_file, predicates, False) - tables = _read_row_groups_into_tables( - parquet_file, columns_to_io, predicates_for_pushdown - ) - - if len(tables) == 0: - table = _empty_table_from_schema(parquet_file) - else: - table = pa.concat_tables(tables) - else: - # ARROW-5139 Column projection with empty columns returns a table w/out index - if columns == []: - # Create an arrow table with expected index length. - df = ( - parquet_file.schema.to_arrow_schema() - .empty_table() - .to_pandas(date_as_object=date_as_object) - ) - index = pd.Int64Index( - pd.RangeIndex(start=0, stop=parquet_file.metadata.num_rows) - ) - df = pd.DataFrame(df, index=index) - # convert back to table to keep downstream code untouched by this patch - table = pa.Table.from_pandas(df) - else: - table = pq.read_pandas(reader, columns=columns) - finally: - reader.close() - - if columns is not None: - missing_columns = set(columns) - set(table.schema.names) - if missing_columns: - raise ValueError( - "Columns cannot be found in stored dataframe: {missing}".format( - missing=", ".join(sorted(missing_columns)) - ) - ) - - table = _reset_dictionary_columns(table, exclude=categories) - df = table.to_pandas(categories=categories, date_as_object=date_as_object) - df.columns = df.columns.map(ensure_unicode_string_type) - if predicates: - df = filter_df_from_predicates(df, predicates, strict_date_types=date_as_object) - else: - df = filter_df(df, filter_query) - if columns is not None: - return df.reindex(columns=columns, copy=False) - else: - return df - - def _columns_for_pushdown(columns, predicates): if columns is None: return From df566bfd09085cf215fd7efe26ca032270461685 Mon Sep 17 00:00:00 2001 From: lr4d Date: Fri, 5 Feb 2021 09:37:06 +0100 Subject: [PATCH 06/11] raise error from previous error --- kartothek/serialization/_parquet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 707d4bd1..291c22bb 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -225,7 +225,8 @@ def restore_dataframe( predicates=predicates, date_as_object=date_as_object, ) - except (AssertionError, OSError): + except (AssertionError, OSError) as err: + raised_error = err _logger.warning( msg=( f"Failed to restore dataframe, attempt {nb_retry + 1} of {MAX_NB_RETRIES} with parameters " @@ -244,7 +245,7 @@ def restore_dataframe( f"key: {key}, filter_query: {filter_query}, columns: {columns}, " f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " f"date_as_object: {date_as_object}, predicates: {predicates}." - ) + ) from raised_error def store(self, store, key_prefix, df): key = "{}.parquet".format(key_prefix) From 1bd81783e5ebc173ea55e70daa07f0f23bc91c8d Mon Sep 17 00:00:00 2001 From: lr4d Date: Fri, 5 Feb 2021 15:33:07 +0100 Subject: [PATCH 07/11] add comments, make AssertionError into IOError --- kartothek/serialization/_io_buffer.py | 2 +- kartothek/serialization/_parquet.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/kartothek/serialization/_io_buffer.py b/kartothek/serialization/_io_buffer.py index cd6563d4..84036399 100644 --- a/kartothek/serialization/_io_buffer.py +++ b/kartothek/serialization/_io_buffer.py @@ -111,7 +111,7 @@ def _fetch_blocks(self, block, n): f"Expected raw read to return {size} bytes, but instead got {len(data)}" ) _logger.error(err) - raise AssertionError(err) + raise IOError(err) # fill blocks for i in range(n): diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 291c22bb..6e664074 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -213,6 +213,9 @@ def restore_dataframe( predicates: Optional[PredicatesType] = None, date_as_object: bool = False, ) -> pd.DataFrame: + # XXX: We have been seeing weired `IOError` while reading Parquet files from Azure Blob Store, + # thus, we implement retries at this point. This code should not live forever, it should be removed + # once the underlying cause has been resolved for nb_retry in range(MAX_NB_RETRIES): try: return cls._restore_dataframe( @@ -225,7 +228,9 @@ def restore_dataframe( predicates=predicates, date_as_object=date_as_object, ) - except (AssertionError, OSError) as err: + # We only retry OSErrors (note that IOError inherits from OSError), as these kind of errors may benefit + # from retries. + except OSError as err: raised_error = err _logger.warning( msg=( From f045e3e7148e229dce1ee5f8939f29fdd2a3f8c4 Mon Sep 17 00:00:00 2001 From: lr4d Date: Fri, 5 Feb 2021 15:38:14 +0100 Subject: [PATCH 08/11] custom error types --- kartothek/serialization/_io_buffer.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/kartothek/serialization/_io_buffer.py b/kartothek/serialization/_io_buffer.py index 84036399..faa492e5 100644 --- a/kartothek/serialization/_io_buffer.py +++ b/kartothek/serialization/_io_buffer.py @@ -10,6 +10,14 @@ _logger = logging.getLogger(__name__) +class BufferReadError(IOError): + """ + Internal kartothek error while attempting to read from buffer + """ + + pass + + class BlockBuffer(io.BufferedIOBase): """ Block-based buffer. @@ -111,7 +119,7 @@ def _fetch_blocks(self, block, n): f"Expected raw read to return {size} bytes, but instead got {len(data)}" ) _logger.error(err) - raise IOError(err) + raise BufferReadError(err) # fill blocks for i in range(n): @@ -135,7 +143,7 @@ def _ensure_range_loaded(self, start, size): if size < 0: msg = f"Expected size >= 0, but got start={start}, size={size}" _logger.error(msg) - raise AssertionError(msg) + raise BufferReadError(msg) block = start // self._blocksize offset = start % self._blocksize From dde7cbdeac62a544403322508824c6699e41d9ed Mon Sep 17 00:00:00 2001 From: Nefta Kanilmaz Date: Wed, 10 Feb 2021 13:41:08 +0100 Subject: [PATCH 09/11] Add tests for the retry mechanism --- kartothek/serialization/_parquet.py | 7 ++-- tests/serialization/test_parquet.py | 54 +++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 6e664074..351bb469 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -213,9 +213,10 @@ def restore_dataframe( predicates: Optional[PredicatesType] = None, date_as_object: bool = False, ) -> pd.DataFrame: - # XXX: We have been seeing weired `IOError` while reading Parquet files from Azure Blob Store, - # thus, we implement retries at this point. This code should not live forever, it should be removed - # once the underlying cause has been resolved + # https://github.com/JDASoftwareGroup/kartothek/issues/407 We have been seeing weird `IOError`s while reading + # Parquet files from Azure Blob Store. These errors have caused long running computations to fail. + # The workaround is to retry the serialization here and gain more stability for long running tasks. + # This code should not live forever, it should be removed once the underlying cause has been resolved. for nb_retry in range(MAX_NB_RETRIES): try: return cls._restore_dataframe( diff --git a/tests/serialization/test_parquet.py b/tests/serialization/test_parquet.py index 9869595b..88d43567 100644 --- a/tests/serialization/test_parquet.py +++ b/tests/serialization/test_parquet.py @@ -11,6 +11,8 @@ from kartothek.serialization import DataFrameSerializer, ParquetSerializer from kartothek.serialization._parquet import ( + MAX_NB_RETRIES, + ParquetReadError, _predicate_accepts, _reset_dictionary_columns, ) @@ -459,3 +461,55 @@ def test_reset_dict_cols(store): only_a_reset = _reset_dictionary_columns(table, exclude=["colB"]).schema assert not pa.types.is_dictionary(only_a_reset.field("col").type) assert pa.types.is_dictionary(only_a_reset.field("colB").type) + + +def test_retry_on_IOError(monkeypatch, caplog, store): + """ + See https://github.com/JDASoftwareGroup/kartothek/issues/407 : + We are testing a retry-workaround for the above issue here. Once the issue is resolved, + this test and the workaround can be removed. + """ + + def patched__restore_dataframe(**kwargs): + # This kind of exception should be captured by the retry mechanism. + raise IOError() + + df = pd.DataFrame({"A": [0, 1, 2, 3]}) + monkeypatch.setattr( + ParquetSerializer, "_restore_dataframe", patched__restore_dataframe + ) + serializer = ParquetSerializer() + key = serializer.store(store, "key", df) + + with pytest.raises(ParquetReadError): + serializer.restore_dataframe(store=store, key=key) + + assert len(caplog.records) == MAX_NB_RETRIES + for log_record in caplog.records: + assert "Failed to restore dataframe" in log_record.message + + +def test_retry_fail_on_other_error(monkeypatch, caplog, store): + """ + See https://github.com/JDASoftwareGroup/kartothek/issues/407 : + We are testing a retry-workaround for the above issue here. Once the issue is resolved, + this test and the workaround can be removed. + + We only want to retry on OSErrors (and inherited exceptions) -- all other exceptions should be raised. + """ + + def patched__restore_dataframe(**kwargs): + # This should not be retried but raised immediately. + raise ValueError() + + df = pd.DataFrame({"A": [0, 1, 2, 3]}) + monkeypatch.setattr( + ParquetSerializer, "_restore_dataframe", patched__restore_dataframe + ) + serializer = ParquetSerializer() + key = serializer.store(store, "key", df) + + with pytest.raises(ValueError): + serializer.restore_dataframe(store=store, key=key) + + assert len(caplog.records) == 0 From 6ed3a524ab787c9dc53fa23fb1c767f7414d1bd2 Mon Sep 17 00:00:00 2001 From: Nefta Kanilmaz Date: Wed, 10 Feb 2021 16:23:35 +0100 Subject: [PATCH 10/11] Fix docs --- CHANGES.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index adccdbfe..d08b7dc6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,7 +10,9 @@ Version 3.19.0 (2021-02-XY) state after the update * Expose compression type and row group chunk size in Cube interface via optional parameter of type :class:`~kartothek.serialization.ParquetSerializer`. -* Add retries to :func:`~kartothek.serialization.parquet.ParquetSerializer.restore_dataframe` +* Add retries to :func:`~kartothek.serialization._parquet.ParquetSerializer.restore_dataframe` + IOErrors on long running ktk + dask tasks have been observed. Until the root cause is fixed, + the serialization is retried to gain more stability. Version 3.18.0 (2021-01-25) =========================== From a178164b2793428b46c3d6f17b970bfce13bd587 Mon Sep 17 00:00:00 2001 From: Nefta Kanilmaz Date: Wed, 10 Feb 2021 17:45:17 +0100 Subject: [PATCH 11/11] Add test counting the retries --- tests/serialization/test_parquet.py | 34 ++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/tests/serialization/test_parquet.py b/tests/serialization/test_parquet.py index 88d43567..cad38c8d 100644 --- a/tests/serialization/test_parquet.py +++ b/tests/serialization/test_parquet.py @@ -470,6 +470,37 @@ def test_retry_on_IOError(monkeypatch, caplog, store): this test and the workaround can be removed. """ + df = pd.DataFrame({"A": [0, 1, 2, 3]}) + + retry_count = 0 + + def patched__restore_dataframe(**kwargs): + nonlocal retry_count + retry_count += 1 + + if not retry_count > 1: + # fail for the first try + raise IOError() + elif retry_count > 1: + # simulate a successful retry + return df + + monkeypatch.setattr( + ParquetSerializer, "_restore_dataframe", patched__restore_dataframe + ) + serializer = ParquetSerializer() + key = serializer.store(store, "key", df) + df_result = serializer.restore_dataframe(store=store, key=key) + pdt.assert_frame_equal(df, df_result) + + +def test_retries_on_IOError_logs(monkeypatch, caplog, store): + """ + See https://github.com/JDASoftwareGroup/kartothek/issues/407 : + We are testing a retry-workaround for the above issue here. Once the issue is resolved, + this test and the workaround can be removed. + """ + def patched__restore_dataframe(**kwargs): # This kind of exception should be captured by the retry mechanism. raise IOError() @@ -498,11 +529,12 @@ def test_retry_fail_on_other_error(monkeypatch, caplog, store): We only want to retry on OSErrors (and inherited exceptions) -- all other exceptions should be raised. """ + df = pd.DataFrame({"A": [0, 1, 2, 3]}) + def patched__restore_dataframe(**kwargs): # This should not be retried but raised immediately. raise ValueError() - df = pd.DataFrame({"A": [0, 1, 2, 3]}) monkeypatch.setattr( ParquetSerializer, "_restore_dataframe", patched__restore_dataframe )