Skip to content

Interrupt heartbeating activity on pause #854

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

THardy98
Copy link
Contributor

@THardy98 THardy98 commented May 1, 2025

What was changed

Adds support to interrupt heartbeating activities from pause requests.

Why?

Functionally support users pausing their activities via the CLI

  1. Closes Heartbeating activities should be interrupted when the activities are paused. #812

  2. How was this tested:
    Simple integration test

  3. Any docs updates needed?
    Unsure

@THardy98 THardy98 requested a review from a team as a code owner May 1, 2025 07:38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this new top-level file/folder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - didn't realize I had it commit

@@ -6250,6 +6258,9 @@ async def heartbeat_async_activity(
)
if resp_by_id.cancel_requested:
raise AsyncActivityCancelledError()
if resp_by_id.activity_paused:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, looks like we never added "raises" docs to AsyncActivityHandle.heartbeat, wonder if we should now

@@ -136,6 +136,20 @@ def details(self) -> Sequence[Any]:
return self._details


class ActivityPausedError(FailureError):
Copy link
Member

@cretz cretz May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really a failure error. A failure error is an error modeled in proto in https://github.com/temporalio/api/blob/master/temporal/api/failure/v1/message.proto and catchable on the workflow side. I think this may need to be an error in the activity module. I also don't think it has details. (but if you read later, you'll see where I actually advocate for abolishing this error)

self.cancel_thread_raiser.raise_in_thread(
temporalio.exceptions.CancelledError
)
if self.cancelled_by_pause:
Copy link
Member

@cretz cretz May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an activity is both canceled and paused, cancel needs to take precedent (but you'll see later where I actually advocate for abolishing the separation)

self.cancel_thread_raiser.raise_in_thread(
temporalio.exceptions.CancelledError
)
if self.cancelled_by_pause:
Copy link
Member

@cretz cretz May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have design discussions on this project...

So this raising is just for threaded activities. It doesn't help async or multiprocess activities know it is paused. So there are three things that exist on activity for knowing whether activity is canceled or worker is shutting down:

  • is_cancelled/is_worker_shutdown
  • wait_for_cancelled/wait_for_worker_shutdown
  • wait_for_cancelled_sync/wait_for_worker_shutdown

Now, the concept of "cancelled" (two-L is the Python naming) is meant to be any cancellation, not just server-cancel-requested, so it includes worker shutdown, timeout, etc. So IMO we may want:

  • is_cancel_requested - when cancel is requested server side (so not true when worker it shutting down like is_cancelled is)
  • is_pause_requested - when a pause is requested server side

And maybe more like is_timed_out? Or maybe we just need a cancel_details that has the bools from Core (even though is_worker_shutdown already exists and would be kinda duplicated).

I don't think we need a wait_for equivalent because wait_for_cancelled already covers it (remember, that's wait for activity to be canceled, not wait for cancel requested).

However, this brings up the question - does the exception need to unique for pause vs all other reasons for cancel (timeout, cancel requested, worker shutdown, etc). In Java at temporalio/sdk-java#2476 we did make them different, but not sure it makes sense here. First, asyncio.CancelledError can't be different, so it only applies to threaded activities. Second, people want to catch canceled error for all reasons, they don't want paused for one reason and everything else (shutdown, timeout, requested) to be another. So I think we should probably just get rid of ActivityPausedError.

Thoughts? Can have internal discussions on this if easier.

Copy link
Contributor Author

@THardy98 THardy98 May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I looked at the Java pattern and followed suite.
I had thought the idea behind it was to provide more specificity with the error. To me, I think this boils down to whether users will want to adopt different behavior on pause vs cancel. If this is likely, then I think the specificity with a pause error makes that convenient. But if what you say is true and most people just want a catch-all cancel error, then it seems less relevant.

In either case, I wonder if we would want ways to distinguish this in our code. I suppose this would only be relevant for when we catch this error (given we already have the cancellation from core to work with in other areas).

Do you think it's necessary at all to distinguish between different types of canceled errors? I could see this being useful/asked for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should be a different type purely for the "it can't be in async" reason. I think you want the experience of "catch cancelled error, then check flags" to be the same regardless of if you're in sync/async.

Even if they could be different types in async, even then I'm not sure it makes sense, because catching the different types (or doing is type checks) feels like more cognitive load because you need to know when certain types are going to be the "primary" cause. With one cancelled type, you just always catch that, then check flags. Simple.

Copy link
Member

@cretz cretz May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought the idea behind it was to provide more specificity with the error

Not in this case or it wouldn't be a "cancel reason" from core it'd be a completely different activity task. Basically there is cancel, which can happen for many reasons, and we do not give each of those reasons its own type. Java does though (it even has one for ActivityNotExistsException in addition to ActivityPausedException and ActivityCanceledException...and it throws them out of heartbeat which is different than Core-based langs).

Do you think it's necessary at all to distinguish between different types of canceled errors?

Different reasons, sure, different types, no. They should have access to the cancel details to know the reason, but it doesn't change the fact that from a user POV it's a "cancel" (even if it's not a "cancel" from a server POV, because they have different definitions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - then I can remove the additional types and add details to the existing cancelled errors. Figure that gives us and users what they want.

Copy link
Member

@cretz cretz May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I wouldn't add details to existing cancelled errors, rather I'd add details to the activity context. Maybe a new cancellation_details() that returns Optional[ActivityCancellationDetails] and that object has the bools in it (I wouldn't even make them functions or prefix with is_, just a @dataclass with the values). Also need to update temporalio.testing.ActivityEnvironment.cancel() to accept this as an optional (default None) kwarg.

@THardy98 THardy98 force-pushed the support_activity_pause branch from d5107b0 to 9763505 Compare May 3, 2025 06:35
@THardy98 THardy98 requested a review from cretz May 3, 2025 06:39
@THardy98 THardy98 force-pushed the support_activity_pause branch from 6e012c3 to a813652 Compare May 3, 2025 14:54
@THardy98 THardy98 force-pushed the support_activity_pause branch from 5c8299a to 4451e77 Compare May 3, 2025 18:07
@@ -135,6 +138,29 @@ def _logger_details(self) -> Mapping[str, Any]:
_current_context: contextvars.ContextVar[_Context] = contextvars.ContextVar("activity")


@dataclass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be frozen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mutate the fields in this object to reflect changes across running activity & _context

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to frozen class + holder

@@ -436,6 +438,7 @@ async def _run_activity(
runtime_metric_meter=None
if sync_non_threaded
else self._metric_meter,
cancellation_details=running_activity.cancellation_details,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context says it is optional, but this is set with a never-None value. I think you may need some kind of "holder" wrapper.

Copy link
Contributor Author

@THardy98 THardy98 May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be preferred over a non-optional field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to frozen class + holder

Comment on lines 99 to 100
if cancellation_details:
self.cancellation_details.set_details(cancellation_details)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to always be set, even if not provided. May want to change param to be non-option w/ a default of temporalio.activity.ActivityCancellationDetails(). Usually using a default param of a full expression is bad in Python because it isn't called for each call, but since the details are immutable, sharing the default for everyone is safe.

Copy link
Contributor Author

@THardy98 THardy98 May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to always be set, even if not provided.

Why?

May want to change param to be non-option w/ a default of temporalio.activity.ActivityCancellationDetails()

Sort of defeats the purpose of being able to return None without checking all fields are False, in which case, maybe the holder indirection isn't so useful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic needs to be "you can guarantee activity.cancellation_details() is not None when the activity is being canceled", but this breaks that guarantee.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unresolving, I think this is still an issue. We should always set an instance of this on cancel in test environment. So the parameter should default to an empty instance with no optional accepted.

Comment on lines 7438 to 7439
except (CancelledError, asyncio.CancelledError):
return activity.cancellation_details()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need some tests to confirm what happens when this is not caught

Copy link
Contributor Author

@THardy98 THardy98 May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for pausing/unpausing activities, we don't catch cancel errors.

Added the unpause activity method to the bridge client - it was missing.

I made it such that a cancelled error on activity pause returns an activity task failure w/ application failure:

  • When going through the default failure converter, a canceled error populates cancel failure info, but core expects application failure info because we are populating the failed field of the completion instead of cancel.
  • A bit of a weird situation, because we can't populate cancel for the reason you mentioned above (we tell server to cancel and server complains that a cancel hasn't been requested yet - like you mention above)

I can revert this change if you prefer, I don't think it causes any notable server effect anyway (state of paused activities are suspended server-side, iiuc), but probably a good thing to have failures sent properly just as a general rule

@THardy98 THardy98 requested a review from cretz May 6, 2025 18:00
Comment on lines 83 to 85
cancellation_details: Optional[
temporalio.activity.ActivityCancellationDetails
] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated #854 (comment) thread, I think this parameter should not be Optional

Copy link
Contributor Author

@THardy98 THardy98 May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops didn't mean to resolve - param is non-optional. Updated callers.

@THardy98 THardy98 force-pushed the support_activity_pause branch from fb9d826 to c5ca36e Compare May 6, 2025 20:54
@THardy98 THardy98 force-pushed the support_activity_pause branch from c5ca36e to c3634fe Compare May 6, 2025 20:58
def cancel(self) -> None:
def cancel(
self,
cancellation_details: temporalio.activity.ActivityCancellationDetails,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, providing the parameter still needs to be optional (this is a backwards incompatible change otherwise), it just needs to not be Optional. So:

Suggested change
cancellation_details: temporalio.activity.ActivityCancellationDetails,
cancellation_details: temporalio.activity.ActivityCancellationDetails = temporalio.activity.ActivityCancellationDetails(cancel_requested=True),

I just picked cancel_requested as the default (might as well set one to True by default). Usually having an expression as a parameter default like this in Python is bad because that same instance is reused for every default so mutations are shared, but this object is immutable so it's safe.

Sorry for the confusion

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Heartbeating activities should be interrupted when the activities are paused.
3 participants