Bump JamesIves/github-pages-deploy-action from 4.7.2 to 4.7.3 (#9018) #15714
71 fail, 111 skipped, 3 918 pass in 11h 13m 51s
27 files 27 suites 11h 13m 51s ⏱️
4 100 tests 3 918 ✅ 111 💤 71 ❌
51 407 runs 48 304 ✅ 2 297 💤 806 ❌
Results for commit fe5e431.
Annotations
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_future_tuple_repr (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43375', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41901', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34477', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_future_tuple_repr(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
> y = da.arange(10, chunks=(5,)).persist()
distributed/tests/test_client.py:404:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
All 13 runs failed: test_get_scheduler_default_client_config_interleaving (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
Failed: DID NOT WARN. No warnings of type (<class 'UserWarning'>,) were emitted.
Emitted warnings: [].
s = <Scheduler 'tcp://127.0.0.1:39101', workers: 0, cores: 0, tasks: 0>
@gen_cluster(config={"scheduler": "sync"}, nthreads=[])
async def test_get_scheduler_default_client_config_interleaving(s):
# This test is using context managers intentionally. We should not refactor
# this to use it in more places to make the client closing cleaner.
with pytest.warns(UserWarning):
assert dask.base.get_scheduler() == dask.local.get_sync
with dask.config.set(scheduler="threads"):
assert dask.base.get_scheduler() == dask.threaded.get
client = await Client(s.address, set_as_default=False, asynchronous=True)
try:
assert dask.base.get_scheduler() == dask.threaded.get
finally:
await client.close()
client = await Client(s.address, set_as_default=True, asynchronous=True)
try:
> assert dask.base.get_scheduler() == client.get
distributed/tests/test_client.py:3426:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
During handling of the above exception, another exception occurred:
s = <Scheduler 'tcp://127.0.0.1:39101', workers: 0, cores: 0, tasks: 0>
@gen_cluster(config={"scheduler": "sync"}, nthreads=[])
async def test_get_scheduler_default_client_config_interleaving(s):
# This test is using context managers intentionally. We should not refactor
# this to use it in more places to make the client closing cleaner.
> with pytest.warns(UserWarning):
E Failed: DID NOT WARN. No warnings of type (<class 'UserWarning'>,) were emitted.
E Emitted warnings: [].
distributed/tests/test_client.py:3414: Failed
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_restart_workers (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 3s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 3s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 4s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:36717', workers: 0, cores: 0, tasks: 0>
a = <Nanny: None, threads: 1>, b = <Nanny: None, threads: 2>
@pytest.mark.slow
@gen_cluster(client=True, Worker=Nanny, worker_kwargs={"plugins": [WorkerStartTime()]})
async def test_restart_workers(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
# Get initial worker start times
results = await c.run(lambda dask_worker: dask_worker.start_time)
a_start_time = results[a.worker_address]
b_start_time = results[b.worker_address]
assert set(s.workers) == {a.worker_address, b.worker_address}
# Persist futures and perform a computation
size = 100
x = da.ones(size, chunks=10)
> x = x.persist()
distributed/tests/test_client.py:4811:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_serialize_collections (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35641', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:46165', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35053', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_serialize_collections(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
> x = da.arange(10, chunks=(5,)).persist()
distributed/tests/test_client.py:5181:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
11 out of 13 runs failed: test_serialize_collections_of_futures (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40453', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34121', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38677', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_serialize_collections_of_futures(c, s, a, b):
pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")
from dask.dataframe.utils import assert_eq
df = pd.DataFrame({"x": [1, 2, 3]})
> ddf = dd.from_pandas(df, npartitions=2).persist()
distributed/tests/test_client.py:5363:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/dask_expr/_collection.py:458: in persist
return DaskMethodsMixin.persist(out, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_call_stack_collections (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 1s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33673', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:42889', name: 0, status: closed, stored: 0, running: 0/4, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34989', name: 1, status: closed, stored: 0, running: 0/4, ready: 0, comm: 0, waiting: 0>
@gen_cluster([("127.0.0.1", 4)] * 2, client=True)
async def test_call_stack_collections(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
> x = da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5).persist()
distributed/tests/test_client.py:5512:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_call_stack_collections_all (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 1s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34263', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45533', name: 0, status: closed, stored: 0, running: 0/4, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41561', name: 1, status: closed, stored: 0, running: 0/4, ready: 0, comm: 0, waiting: 0>
@gen_cluster([("127.0.0.1", 4)] * 2, client=True)
async def test_call_stack_collections_all(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
> x = da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5).persist()
distributed/tests/test_client.py:5524:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
11 out of 13 runs failed: test_get_mix_futures_and_SubgraphCallable_dask_dataframe (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41827', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39971', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42603', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_get_mix_futures_and_SubgraphCallable_dask_dataframe(c, s, a, b):
pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")
df = pd.DataFrame({"x": range(1, 11)})
> ddf = dd.from_pandas(df, npartitions=2).persist()
distributed/tests/test_client.py:6182:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/dask_expr/_collection.py:458: in persist
return DaskMethodsMixin.persist(out, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
6 out of 13 runs failed: test_file_descriptors_dont_leak[Worker] (distributed.tests.test_client)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
Worker = <class 'distributed.worker.Worker'>
@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
@pytest.mark.skipif(MACOS, reason="dask/distributed#8075")
@pytest.mark.parametrize(
"Worker", [Worker, pytest.param(Nanny, marks=[pytest.mark.slow])]
)
@gen_test()
async def test_file_descriptors_dont_leak(Worker):
pytest.importorskip("pandas")
df = dask.datasets.timeseries(freq="10s", dtypes={"x": int, "y": float})
proc = psutil.Process()
before = proc.num_fds()
async with Scheduler(dashboard_address=":0") as s:
async with (
Worker(s.address),
Worker(s.address),
Client(s.address, asynchronous=True),
):
assert proc.num_fds() > before
> await df.sum().persist()
distributed/tests/test_client.py:6245:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/dask_expr/_collection.py:458: in persist
return DaskMethodsMixin.persist(out, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
6 out of 13 runs failed: test_file_descriptors_dont_leak[Nanny] (distributed.tests.test_client)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 2s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
Worker = <class 'distributed.nanny.Nanny'>
@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
@pytest.mark.skipif(MACOS, reason="dask/distributed#8075")
@pytest.mark.parametrize(
"Worker", [Worker, pytest.param(Nanny, marks=[pytest.mark.slow])]
)
@gen_test()
async def test_file_descriptors_dont_leak(Worker):
pytest.importorskip("pandas")
df = dask.datasets.timeseries(freq="10s", dtypes={"x": int, "y": float})
proc = psutil.Process()
before = proc.num_fds()
async with Scheduler(dashboard_address=":0") as s:
async with (
Worker(s.address),
Worker(s.address),
Client(s.address, asynchronous=True),
):
assert proc.num_fds() > before
> await df.sum().persist()
distributed/tests/test_client.py:6245:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/dask_expr/_collection.py:458: in persist
return DaskMethodsMixin.persist(out, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
All 13 runs failed: test_futures_of_sorted (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44377', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:46463', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45919', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_futures_of_sorted(c, s, a, b):
> b = dask.bag.from_sequence(range(10), npartitions=5).persist()
distributed/tests/test_client.py:6343:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_annotations_task_state (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34143', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35193', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34133', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_annotations_task_state(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
with dask.annotate(qux="bar", priority=100):
x = da.ones(10, chunks=(5,))
with dask.config.set(optimization__fuse__active=False):
> x = await x.persist()
distributed/tests/test_client.py:6770:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_annotations_priorities (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41941', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36065', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43263', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_annotations_priorities(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
with dask.annotate(priority=15):
x = da.ones(10, chunks=(5,))
with dask.config.set(optimization__fuse__active=False):
> x = await x.persist()
distributed/tests/test_client.py:6825:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_annotations_workers (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38537', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:46693', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35629', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_annotations_workers(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
with dask.annotate(workers=[a.address]):
x = da.ones(10, chunks=(5,))
with dask.config.set(optimization__fuse__active=False):
> x = await x.persist()
distributed/tests/test_client.py:6841:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_annotations_retries (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38167', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:37775', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37209', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_annotations_retries(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
with dask.annotate(retries=2):
x = da.ones(10, chunks=(5,))
with dask.config.set(optimization__fuse__active=False):
> x = await x.persist()
distributed/tests/test_client.py:6860:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_annotations_resources (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37975', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44197', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36059', name: 1, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
@gen_cluster(
client=True,
nthreads=[
("127.0.0.1", 1),
("127.0.0.1", 1, {"resources": {"GPU": 1}}),
],
)
async def test_annotations_resources(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
with dask.annotate(resources={"GPU": 1}):
x = da.ones(10, chunks=(5,))
with dask.config.set(optimization__fuse__active=False):
> x = await x.persist()
distributed/tests/test_client.py:6913:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_annotations_loose_restrictions (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40425', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40795', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:40119', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_annotations_loose_restrictions(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
# Eventually fails if allow_other_workers=False
with dask.annotate(workers=["fake"], allow_other_workers=True):
x = da.ones(10, chunks=(5,))
with dask.config.set(optimization__fuse__active=False):
> x = await x.persist()
distributed/tests/test_client.py:6952:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_client
github-actions / Unit Test Results
12 out of 13 runs failed: test_computation_object_code_dask_persist (distributed.tests.test_client)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37679', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:38683', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34971', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True, config={"distributed.diagnostics.computations.nframes": 2})
async def test_computation_object_code_dask_persist(c, s, a, b):
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
x = da.ones((10, 10), chunks=(3, 3))
> future = x.sum().persist()
distributed/tests/test_client.py:7232:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_nanny
github-actions / Unit Test Results
7 out of 13 runs failed: test_malloc_trim_threshold (distributed.tests.test_nanny)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43655', workers: 0, cores: 0, tasks: 0>
a = <Nanny: None, threads: 2>
@pytest.mark.slow
@pytest.mark.skipif(not LINUX, reason="Requires GNU libc")
@gen_cluster(
client=True,
Worker=Nanny,
nthreads=[("", 2)],
worker_kwargs={"memory_limit": "2GiB"},
)
async def test_malloc_trim_threshold(c, s, a):
"""Test that the nanny sets the MALLOC_TRIM_THRESHOLD_ environment variable before
starting the worker process.
This test relies on these settings to work:
distributed.nanny.pre-spawn-environ.MALLOC_TRIM_THRESHOLD_: 65536
distributed.worker.multiprocessing-method: spawn
We're deliberately not setting them explicitly in @gen_cluster above, as we want
this test to trip if somebody changes distributed.yaml.
Note
----
This test may start failing in a future Python version if CPython switches to
using mimalloc by default. If it does, a thorough benchmarking exercise is needed.
"""
pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
arr = da.random.random(2**29 // 8, chunks="512 kiB") # 0.5 GiB
> arr = arr.persist()
distributed/tests/test_nanny.py:806:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:343: in persist
(result,) = persist(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_scheduler
github-actions / Unit Test Results
12 out of 13 runs failed: test_decide_worker_coschedule_order_neighbors[nthreads0-0] (distributed.tests.test_scheduler)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
ndeps = 0
nthreads = [('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1)]
@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
[
[("127.0.0.1", 1)] * 5,
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
np = pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
random_keys = set(flatten(x.__dask_keys__()))
trivial_deps = {}
else:
def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))
trivial_deps = {
f"k{i}": delayed(object(), name=f"object-{i}") for i in range(ndeps)
}
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)
random_keys = set(flatten(x.__dask_keys__()))
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum
# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for keys in x.__dask_keys__():
# Iterate along rows of the array.
keys = set(keys)
# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])
# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not random_keys.intersection(keys)
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(
(not isinstance(k, str) or not k.startswith("object")) for k in keys
):
# But not many other things should be
unexpected_transfers.append(list(keys))
# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers
> test_decide_worker_coschedule_order_neighbors_()
distributed/tests/test_scheduler.py:284:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:1090: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:380: in _run_and_close_tornado
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
distributed/compatibility.py:236: in asyncio_run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
distributed/utils_test.py:377: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1087: in async_fn_outer
return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/utils_test.py:1019: in async_fn
result = await coro2
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/tests/test_scheduler.py:234: in test_decide_worker_coschedule_order_neighbors_
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_scheduler
github-actions / Unit Test Results
12 out of 13 runs failed: test_decide_worker_coschedule_order_neighbors[nthreads0-1] (distributed.tests.test_scheduler)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
ndeps = 1
nthreads = [('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1)]
@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
[
[("127.0.0.1", 1)] * 5,
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
np = pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
random_keys = set(flatten(x.__dask_keys__()))
trivial_deps = {}
else:
def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))
trivial_deps = {
f"k{i}": delayed(object(), name=f"object-{i}") for i in range(ndeps)
}
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)
random_keys = set(flatten(x.__dask_keys__()))
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum
# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for keys in x.__dask_keys__():
# Iterate along rows of the array.
keys = set(keys)
# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])
# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not random_keys.intersection(keys)
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(
(not isinstance(k, str) or not k.startswith("object")) for k in keys
):
# But not many other things should be
unexpected_transfers.append(list(keys))
# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers
> test_decide_worker_coschedule_order_neighbors_()
distributed/tests/test_scheduler.py:284:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:1090: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:380: in _run_and_close_tornado
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
distributed/compatibility.py:236: in asyncio_run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
distributed/utils_test.py:377: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1087: in async_fn_outer
return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/utils_test.py:1019: in async_fn
result = await coro2
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/tests/test_scheduler.py:234: in test_decide_worker_coschedule_order_neighbors_
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_scheduler
github-actions / Unit Test Results
12 out of 13 runs failed: test_decide_worker_coschedule_order_neighbors[nthreads0-4] (distributed.tests.test_scheduler)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
ndeps = 4
nthreads = [('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1), ('127.0.0.1', 1)]
@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
[
[("127.0.0.1", 1)] * 5,
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
np = pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
random_keys = set(flatten(x.__dask_keys__()))
trivial_deps = {}
else:
def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))
trivial_deps = {
f"k{i}": delayed(object(), name=f"object-{i}") for i in range(ndeps)
}
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)
random_keys = set(flatten(x.__dask_keys__()))
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum
# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for keys in x.__dask_keys__():
# Iterate along rows of the array.
keys = set(keys)
# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])
# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not random_keys.intersection(keys)
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(
(not isinstance(k, str) or not k.startswith("object")) for k in keys
):
# But not many other things should be
unexpected_transfers.append(list(keys))
# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers
> test_decide_worker_coschedule_order_neighbors_()
distributed/tests/test_scheduler.py:284:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:1090: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:380: in _run_and_close_tornado
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
distributed/compatibility.py:236: in asyncio_run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
distributed/utils_test.py:377: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1087: in async_fn_outer
return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/utils_test.py:1019: in async_fn
result = await coro2
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/tests/test_scheduler.py:234: in test_decide_worker_coschedule_order_neighbors_
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_scheduler
github-actions / Unit Test Results
12 out of 13 runs failed: test_decide_worker_coschedule_order_neighbors[nthreads1-0] (distributed.tests.test_scheduler)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
ndeps = 0, nthreads = [('127.0.0.1', 3), ('127.0.0.1', 2), ('127.0.0.1', 1)]
@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
[
[("127.0.0.1", 1)] * 5,
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
np = pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
random_keys = set(flatten(x.__dask_keys__()))
trivial_deps = {}
else:
def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))
trivial_deps = {
f"k{i}": delayed(object(), name=f"object-{i}") for i in range(ndeps)
}
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)
random_keys = set(flatten(x.__dask_keys__()))
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum
# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for keys in x.__dask_keys__():
# Iterate along rows of the array.
keys = set(keys)
# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])
# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not random_keys.intersection(keys)
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(
(not isinstance(k, str) or not k.startswith("object")) for k in keys
):
# But not many other things should be
unexpected_transfers.append(list(keys))
# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers
> test_decide_worker_coschedule_order_neighbors_()
distributed/tests/test_scheduler.py:284:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:1090: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:380: in _run_and_close_tornado
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
distributed/compatibility.py:236: in asyncio_run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
distributed/utils_test.py:377: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1087: in async_fn_outer
return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/utils_test.py:1019: in async_fn
result = await coro2
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/tests/test_scheduler.py:234: in test_decide_worker_coschedule_order_neighbors_
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_scheduler
github-actions / Unit Test Results
12 out of 13 runs failed: test_decide_worker_coschedule_order_neighbors[nthreads1-1] (distributed.tests.test_scheduler)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
ndeps = 1, nthreads = [('127.0.0.1', 3), ('127.0.0.1', 2), ('127.0.0.1', 1)]
@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
[
[("127.0.0.1", 1)] * 5,
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
np = pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
random_keys = set(flatten(x.__dask_keys__()))
trivial_deps = {}
else:
def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))
trivial_deps = {
f"k{i}": delayed(object(), name=f"object-{i}") for i in range(ndeps)
}
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)
random_keys = set(flatten(x.__dask_keys__()))
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum
# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for keys in x.__dask_keys__():
# Iterate along rows of the array.
keys = set(keys)
# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])
# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not random_keys.intersection(keys)
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(
(not isinstance(k, str) or not k.startswith("object")) for k in keys
):
# But not many other things should be
unexpected_transfers.append(list(keys))
# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers
> test_decide_worker_coschedule_order_neighbors_()
distributed/tests/test_scheduler.py:284:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:1090: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:380: in _run_and_close_tornado
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
distributed/compatibility.py:236: in asyncio_run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
distributed/utils_test.py:377: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1087: in async_fn_outer
return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/utils_test.py:1019: in async_fn
result = await coro2
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/tests/test_scheduler.py:234: in test_decide_worker_coschedule_order_neighbors_
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError
Check warning on line 0 in distributed.tests.test_scheduler
github-actions / Unit Test Results
12 out of 13 runs failed: test_decide_worker_coschedule_order_neighbors[nthreads1-4] (distributed.tests.test_scheduler)
artifacts/macos-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.13-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.13-default-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
ndeps = 4, nthreads = [('127.0.0.1', 3), ('127.0.0.1', 2), ('127.0.0.1', 1)]
@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
[
[("127.0.0.1", 1)] * 5,
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={
"distributed.scheduler.work-stealing": False,
"distributed.scheduler.worker-saturation": float("inf"),
},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
np = pytest.importorskip("numpy")
da = pytest.importorskip("dask.array")
if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
random_keys = set(flatten(x.__dask_keys__()))
trivial_deps = {}
else:
def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))
trivial_deps = {
f"k{i}": delayed(object(), name=f"object-{i}") for i in range(ndeps)
}
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)
random_keys = set(flatten(x.__dask_keys__()))
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum
# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for keys in x.__dask_keys__():
# Iterate along rows of the array.
keys = set(keys)
# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2
# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])
# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not random_keys.intersection(keys)
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(
(not isinstance(k, str) or not k.startswith("object")) for k in keys
):
# But not many other things should be
unexpected_transfers.append(list(keys))
# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers
> test_decide_worker_coschedule_order_neighbors_()
distributed/tests/test_scheduler.py:284:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
return func(*args, **kwds)
distributed/utils_test.py:1090: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:380: in _run_and_close_tornado
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
distributed/compatibility.py:236: in asyncio_run
return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
distributed/utils_test.py:377: in inner_fn
return await async_fn(*args, **kwargs)
distributed/utils_test.py:1087: in async_fn_outer
return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/utils_test.py:1019: in async_fn
result = await coro2
distributed/utils.py:1915: in wait_for
return await asyncio.wait_for(fut, timeout)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
return fut.result()
distributed/tests/test_scheduler.py:234: in test_decide_worker_coschedule_order_neighbors_
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:967: in persist
schedule = get_scheduler(scheduler=scheduler, collections=collections)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1151: in get_scheduler
return get_scheduler(scheduler=config.get("scheduler", None))
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1131: in get_scheduler
return _ensure_not_async(client)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <Client: No scheduler connected>
def _ensure_not_async(client):
if client.asynchronous:
if fallback := config.get("admin.async-client-fallback", None):
warnings.warn(
"Distributed Client detected but Client instance is "
f"asynchronous. Falling back to `{fallback}` scheduler. "
"To use an asynchronous Client, please use "
"``Client.compute`` and ``Client.gather`` "
"instead of the top level ``dask.compute``",
UserWarning,
)
return get_scheduler(scheduler=fallback)
else:
> raise RuntimeError(
"Attempting to use an asynchronous "
"Client in a synchronous context of `dask.compute`"
)
E RuntimeError: Attempting to use an asynchronous Client in a synchronous context of `dask.compute`
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/base.py:1080: RuntimeError