Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

TASK_SOURCE cache policy is not applied to nested tasks #16925

Closed
edalfon opened this issue Jan 31, 2025 · 3 comments
Closed

TASK_SOURCE cache policy is not applied to nested tasks #16925

edalfon opened this issue Jan 31, 2025 · 3 comments
Labels
enhancement An improvement of an existing feature

Comments

@edalfon
Copy link

edalfon commented Jan 31, 2025

Bug summary

When using the new feature in Prefect 3.0 that allows nesting tasks within other tasks, I expected that cache policies—specifically TASK_SOURCE—would apply recursively to all nested tasks. However, this does not seem to be the case.

Consider the following minimal example:

from prefect import flow, task
from prefect.cache_policies import INPUTS, TASK_SOURCE

@flow(log_prints=True)
def myflow():
    task_result = maintask("I am the input for the main task")
    print(task_result)

@task(cache_policy=INPUTS + TASK_SOURCE)
def maintask(input: str):
    newinput = subtask(input)
    return newinput + " - Main task completed"

@task(cache_policy=INPUTS + TASK_SOURCE)
def subtask(input: str):
    return input + " - Subtask completed"

Expected Behavior (this is MY expected behavior, but I am not sure if it is the intended design. So I am not necessarily claiming this is a bug -despite the label that seemed the only appropriate one-)

Given a flow (like the example above) where:

  • Tasks are nested within other tasks.
  • The TASK_SOURCE cache policy is used to avoid unnecessary re-execution.

I expected that changing the source code of a nested task would invalidate the cache and trigger a re-execution of the affected task(s).

Observed Behavior:

The cache policy works as expected when:

  • Running the flow for the first time.
  • Running it again without changes (it correctly retrieves cached results).
  • Changing the inputs (it detects the change and re-executes the necessary tasks).
  • Changing the source of the main task (it detects the change and re-executes the main task).

Issue: When only the source of a nested task changes, Prefect does not detect this modification. Instead, it seems to me that it checks only the inputs and source of the main task. Since those remain unchanged, Prefect retrieves cached results, even though the nested task has been modified. This can lead to outdated or incorrect results in workflows where the nested task's logic impacts the main task.

Question / Feature Request

Is this behavior intentional, or should TASK_SOURCE cache policy apply recursively to nested tasks?
If this is the expected behavior, I suggest adding a clear note in the documentation to avoid confusion.
If not, would it be possible to add recursive cache invalidation for nested tasks to the roadmap?

Version info

Version:             3.1.15
API version:         0.8.4
Python version:      3.10.11
Git commit:          3ac3d548
Built:               Thu, Jan 30, 2025 11:31 AM
OS/Arch:             win32/AMD64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.10.4
Server:
  Database:          sqlite
  SQLite version:    3.40.1

Additional context

No response

@edalfon edalfon added the bug Something isn't working label Jan 31, 2025
@zzstoatzz
Copy link
Collaborator

hey @edalfon, thanks for the issue!

the current behavior is intentional - TASK_SOURCE works by taking the raw lines of code as a string and hashing them, rather than doing some recursive analysis of nested source code. It would likely get quite complex to hash all the way down.

That said, I think the use case makes sense. While you could write a CacheKeyFnPolicy for this today, there could also be an approach that lets you declare source dependencies between tasks, something like (credit for this idea to @cicdw):

@task(cache_policy=INPUTS + TASK_SOURCE.depends(subtask))
def main_task(input: str):
    ...

what do you think about something like this?

@edalfon
Copy link
Author

edalfon commented Feb 1, 2025

Hi @zzstoatzz, thanks for taking a look at this and clarifying the intended behavior.

I think explicitly declaring dependencies between tasks using something like TASK_SOURCE.depends(subtask) could be a good approach. However, it might become verbose if a task depends on many others —though that could also indicate an opportunity to further split the task for better modularity—.

I initially assumed that, since Prefect already detects dependencies between tasks within a flow, it would be easier to extend that logic to caching. But I understand could get quite complex.

So, I'll experiment with writing a custom CacheKeyFnPolicy to see if I can find a solution for my use case. If I come up with something useful, I'll report back here.

Thanks again!

@cicdw cicdw added enhancement An improvement of an existing feature and removed bug Something isn't working labels Feb 5, 2025
@cicdw
Copy link
Member

cicdw commented Feb 5, 2025

Going to close this with the docs update and the information about how to achieve the desired end state - we can revisit if it becomes a popular request

@cicdw cicdw closed this as not planned Won't fix, can't repro, duplicate, stale Feb 5, 2025
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

No branches or pull requests

3 participants