Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

DaskTaskRunner tasks occasionally fail with AttributeError: 'NoneType' object has no attribute 'address' #17334

Open
kzvezdarov opened this issue Mar 2, 2025 · 0 comments
Labels
bug Something isn't working

Comments

@kzvezdarov
Copy link
Contributor

Bug summary

Tasks launched via a DaskTaskRunner randomly fail with the following exception:

  File "/Users/kzvezdarov/git/prefect-dask-test/attr_err_flow.py", line 11, in load_dataframe
    with get_dask_client():
      ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
    ^^^^^^^^^^^^^^^^^
  File "/Users/kzvezdarov/git/prefect-dask-test/.venv/lib/python3.12/site-packages/prefect_dask/utils.py", line 101, in get_dask_client
    client_kwargs = _generate_client_kwargs(
      ^^^^^^^^^^^^^^^^^
  File "/Users/kzvezdarov/git/prefect-dask-test/.venv/lib/python3.12/site-packages/prefect_dask/utils.py", line 29, in _generate_client_kwargs
    address = get_client().scheduler.address
    ^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'address'

Minimum flow to reproduce this (somewhat reliably; executed on a local process workpool):

from prefect import flow, task, serve
import dask.dataframe as dd
import numpy as np
from prefect.futures import PrefectFutureList
from prefect_dask.utils import get_dask_client
from prefect_dask.task_runners import DaskTaskRunner


@task
def load_dataframe() -> dd.DataFrame:
    with get_dask_client():
        return (
            dd.DataFrame.from_dict(
                {
                    "x": np.random.random(size=1_000),
                    "y": np.random.random(size=1_000),
                }
            )
            .mean()
            .compute()
        )


@flow(task_runner=DaskTaskRunner())
def attr_err_flow():
    tasks = PrefectFutureList()
    for _ in range(10):
        tasks.append(load_dataframe.submit())

    return tasks.result()


if __name__ == "__main__":
    attr_err_deploy = attr_err_flow.to_deployment(
        name="attr-err-deployment", work_pool_name="local"
    )

    serve(attr_err_deploy)

This seems like some kind of a race condition, because increasing the amount work each
task has to do (via size) makes it less and less likely to manifest.

This appears to happen both when using both LocalCluster and DaskKubernetesOperator
ephemeral clusters.

Finally, a fairly straightforward workaround seems to be simly retrying the task when
that exception is encountered.

Version info

Version:             3.2.9
API version:         0.8.4
Python version:      3.12.9
Git commit:          27eb408c
Built:               Fri, Feb 28, 2025 8:12 PM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         ephemeral
Pydantic version:    2.10.6
Server:
  Database:          sqlite
  SQLite version:    3.49.1
Integrations:
  prefect-dask:      0.3.3

Additional context

Full flow run logs:

vengeful-wildcat.csv

@kzvezdarov kzvezdarov added the bug Something isn't working label Mar 2, 2025
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant