From 098cbdedaff0679952bea6b5e0e32f11973d2221 Mon Sep 17 00:00:00 2001 From: hekaisheng Date: Wed, 27 Apr 2022 13:48:19 +0800 Subject: [PATCH 1/3] Fix potential leak for shuffle tasks --- mars/deploy/oscar/tests/test_local.py | 31 +++++++++++++++++++++++++++ mars/services/storage/api/oscar.py | 4 ++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index 23c5ccec69..2d82cd44ab 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -230,6 +230,15 @@ async def test_execute(create_cluster, config): del a, b + # remove this after fixing refcount + if not isinstance(session._isolated_session, _IsolatedWebSession): + worker_pools = session.client._cluster._worker_pools + await session.destroy() + for worker_pool in worker_pools: + _assert_storage_cleaned( + session.session_id, worker_pool.external_address, StorageLevel.MEMORY + ) + @pytest.mark.asyncio async def test_iterative_tiling(create_cluster): @@ -254,6 +263,15 @@ async def test_iterative_tiling(create_cluster): assert df2.index_value.min_val >= 1 assert df2.index_value.max_val <= 30 + # remove this after fixing refcount + if not isinstance(session._isolated_session, _IsolatedWebSession): + worker_pools = session.client._cluster._worker_pools + await session.destroy() + for worker_pool in worker_pools: + _assert_storage_cleaned( + session.session_id, worker_pool.external_address, StorageLevel.MEMORY + ) + @pytest.mark.asyncio async def test_execute_describe(create_cluster): @@ -270,6 +288,14 @@ async def test_execute_describe(create_cluster): assert info.progress() == 1 res = await session.fetch(r) pd.testing.assert_frame_equal(res, raw.describe()) + # remove this after fixing refcount + if not isinstance(session._isolated_session, _IsolatedWebSession): + worker_pools = session.client._cluster._worker_pools + await session.destroy() + for worker_pool in worker_pools: + _assert_storage_cleaned( + session.session_id, worker_pool.external_address, StorageLevel.MEMORY + ) @pytest.mark.asyncio @@ -394,6 +420,11 @@ async def test_web_session(create_cluster, config): AsyncSession.reset_default() await session.destroy() await _run_web_session_test(web_address) + worker_pools = client._cluster._worker_pools + for worker_pool in worker_pools: + _assert_storage_cleaned( + session.session_id, worker_pool.external_address, StorageLevel.MEMORY + ) def test_sync_execute(): diff --git a/mars/services/storage/api/oscar.py b/mars/services/storage/api/oscar.py index 91708f332a..32a0f59ab9 100644 --- a/mars/services/storage/api/oscar.py +++ b/mars/services/storage/api/oscar.py @@ -185,7 +185,7 @@ async def fetch( error: str raise or ignore """ - await self._storage_handler_ref.fetch_batch( + return await self._storage_handler_ref.fetch_batch( self._session_id, [data_key], level, band_name, remote_address, error ) @@ -201,7 +201,7 @@ async def batch_fetch(self, args_list, kwargs_list): assert extracted_args == (level, band_name, dest_address, error) extracted_args = (level, band_name, dest_address, error) data_keys.append(data_key) - await self._storage_handler_ref.fetch_batch( + return await self._storage_handler_ref.fetch_batch( self._session_id, data_keys, *extracted_args ) From 74c01969e66642323354e3b44efe9041c52c11af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=A7=E7=9B=9B?= Date: Thu, 28 Apr 2022 17:18:42 +0800 Subject: [PATCH 2/3] Fix ref counts --- mars/deploy/oscar/session.py | 1 + mars/deploy/oscar/tests/test_local.py | 25 ++++++++++++++++--- mars/services/lifecycle/supervisor/tracker.py | 5 +++- mars/services/task/execution/mars/executor.py | 6 ++--- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index 16178691c3..e72f2e690a 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -1236,6 +1236,7 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list: return result async def decref(self, *tileable_keys): + logger.debug("Decref tileables on client: %s", tileable_keys) return await self._lifecycle_api.decref_tileables(list(tileable_keys)) async def _get_ref_counts(self) -> Dict[str, int]: diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index 2d82cd44ab..024b5fd565 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -230,7 +230,6 @@ async def test_execute(create_cluster, config): del a, b - # remove this after fixing refcount if not isinstance(session._isolated_session, _IsolatedWebSession): worker_pools = session.client._cluster._worker_pools await session.destroy() @@ -263,7 +262,6 @@ async def test_iterative_tiling(create_cluster): assert df2.index_value.min_val >= 1 assert df2.index_value.max_val <= 30 - # remove this after fixing refcount if not isinstance(session._isolated_session, _IsolatedWebSession): worker_pools = session.client._cluster._worker_pools await session.destroy() @@ -288,7 +286,7 @@ async def test_execute_describe(create_cluster): assert info.progress() == 1 res = await session.fetch(r) pd.testing.assert_frame_equal(res, raw.describe()) - # remove this after fixing refcount + if not isinstance(session._isolated_session, _IsolatedWebSession): worker_pools = session.client._cluster._worker_pools await session.destroy() @@ -420,6 +418,7 @@ async def test_web_session(create_cluster, config): AsyncSession.reset_default() await session.destroy() await _run_web_session_test(web_address) + worker_pools = client._cluster._worker_pools for worker_pool in worker_pools: _assert_storage_cleaned( @@ -572,6 +571,26 @@ def test_decref(setup_session): ref_counts = session._get_ref_counts() assert len(ref_counts) == 0 + with tempfile.TemporaryDirectory() as tempdir: + file_path = os.path.join(tempdir, "test.csv") + pdf = pd.DataFrame( + np.random.RandomState(0).rand(100, 10), + columns=[f"col{i}" for i in range(10)], + ) + pdf.to_csv(file_path, index=False) + + df = md.read_csv(file_path, chunk_bytes=os.stat(file_path).st_size / 5) + df2 = df.head(10) + + result = df2.execute().fetch() + expected = pdf.head(10) + pd.testing.assert_frame_equal(result, expected) + + del df, df2 + + ref_counts = session._get_ref_counts() + assert len(ref_counts) == 0 + worker_addr = session._session.client._cluster._worker_pools[0].external_address _assert_storage_cleaned(session.session_id, worker_addr, StorageLevel.MEMORY) diff --git a/mars/services/lifecycle/supervisor/tracker.py b/mars/services/lifecycle/supervisor/tracker.py index 4eb24da685..f81db888d8 100644 --- a/mars/services/lifecycle/supervisor/tracker.py +++ b/mars/services/lifecycle/supervisor/tracker.py @@ -78,7 +78,10 @@ def _check_ref_counts(cls, keys: List[str], ref_counts: List[int]): ) def incref_chunks(self, chunk_keys: List[str], counts: List[int] = None): - logger.debug("Increase reference count for chunks %s", chunk_keys) + logger.debug( + "Increase reference count for chunks %s", + {ck: self._chunk_ref_counts[ck] for ck in chunk_keys}, + ) self._check_ref_counts(chunk_keys, counts) counts = counts if counts is not None else itertools.repeat(1) for chunk_key, count in zip(chunk_keys, counts): diff --git a/mars/services/task/execution/mars/executor.py b/mars/services/task/execution/mars/executor.py index 7609a8291d..4aa15584df 100644 --- a/mars/services/task/execution/mars/executor.py +++ b/mars/services/task/execution/mars/executor.py @@ -384,9 +384,9 @@ def _get_decref_stage_chunk_key_to_counts( for inp_subtask in subtask_graph.predecessors(subtask): for c in inp_subtask.chunk_graph.results: decref_chunk_key_to_counts[c.key] += 1 - # decref result of chunk graphs - for c in stage_processor.chunk_graph.results: - decref_chunk_key_to_counts[c.key] += 1 + # decref result of chunk graphs + for c in stage_processor.chunk_graph.results: + decref_chunk_key_to_counts[c.key] += 1 return decref_chunk_key_to_counts @mo.extensible From a38d002f67887343f63262b3c1ad192c5ec7d942 Mon Sep 17 00:00:00 2001 From: hekaisheng Date: Thu, 28 Apr 2022 18:02:16 +0800 Subject: [PATCH 3/3] Fix ut --- mars/services/storage/api/oscar.py | 4 +++- mars/services/storage/transfer.py | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/mars/services/storage/api/oscar.py b/mars/services/storage/api/oscar.py index 32a0f59ab9..c5a129ab58 100644 --- a/mars/services/storage/api/oscar.py +++ b/mars/services/storage/api/oscar.py @@ -185,9 +185,11 @@ async def fetch( error: str raise or ignore """ - return await self._storage_handler_ref.fetch_batch( + fetch_key = await self._storage_handler_ref.fetch_batch( self._session_id, [data_key], level, band_name, remote_address, error ) + if fetch_key: + return fetch_key @fetch.batch async def batch_fetch(self, args_list, kwargs_list): diff --git a/mars/services/storage/transfer.py b/mars/services/storage/transfer.py index dad3b10ac6..7e6ef6a9e3 100644 --- a/mars/services/storage/transfer.py +++ b/mars/services/storage/transfer.py @@ -224,7 +224,11 @@ async def send_batch_data( ) await self._data_manager_ref.unpin.batch(*unpin_tasks) logger.debug( - "Finish sending data (%s, %s) to %s", session_id, data_keys, address + "Finish sending data (%s, %s) to %s, total size is %s", + session_id, + data_keys, + address, + sum(data_sizes), )