Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(agents-api): resolve continue as new issue and add a start_workflow interceptor #1175

Merged
merged 2 commits into from
Feb 20, 2025

Conversation

Ahmad-mtos
Copy link
Contributor

@Ahmad-mtos Ahmad-mtos commented Feb 19, 2025

User description

…ow interceptor


PR Type

Enhancement, Bug fix


Description

  • Added a CustomClientInterceptor for Temporal client workflows.

  • Enhanced workflow argument handling with offload_if_large.

  • Fixed continue as new issue in Temporal workflows.

  • Refactored and updated interceptors for better error handling.


Changes walkthrough 📝

Relevant files
Enhancement
temporal.py
Add CustomClientInterceptor to Temporal client                     

agents-api/agents_api/clients/temporal.py

  • Imported CustomClientInterceptor for client workflows.
  • Added CustomClientInterceptor to Temporal client interceptors.
  • +2/-1     
    interceptors.py
    Add and refactor Temporal workflow interceptors                   

    agents-api/agents_api/common/interceptors.py

  • Introduced CustomClientInterceptor and CustomOutboundInterceptor.
  • Enhanced workflow argument handling with offload_if_large.
  • Refactored workflow interceptors for better error handling.
  • Removed redundant @offload_to_blob_store decorators.
  • +40/-4   

    Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.

  • Important

    Add CustomClientInterceptor for workflow start operations and modify interceptors for large argument handling in temporal.py and interceptors.py.

    • Interceptors:
      • Add CustomClientInterceptor in interceptors.py to handle start_workflow with error handling and argument offloading.
      • Modify CustomWorkflowOutboundInterceptor to handle large arguments in start_activity and start_child_workflow.
      • Rename CustomOutboundInterceptor to CustomWorkflowOutboundInterceptor.
    • Client:
      • Add CustomClientInterceptor to get_client() in temporal.py to intercept client operations.
    • Misc:
      • Remove offload_to_blob_store decorator from start_activity and start_child_workflow in CustomWorkflowOutboundInterceptor.

    This description was created by Ellipsis for 6151c97. It will automatically update as commits are pushed.

    Copy link
    Contributor

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
    🧪 No relevant tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Code Duplication

    The offload_if_large logic is duplicated across multiple interceptor methods. Consider extracting this into a shared helper method to improve maintainability.

            input.args = [offload_if_large(arg) for arg in input.args]
            return handle_execution_with_errors_sync(
                super().start_activity,
                input,
            )
    
        @offload_to_blob_store
        def continue_as_new(self, input: ContinueAsNewInput) -> NoReturn:
            return handle_execution_with_errors_sync(
                super().continue_as_new,
                input,
            )
    
        @offload_to_blob_store
        def start_local_activity(self, input: StartLocalActivityInput) -> ActivityHandle:
            return handle_execution_with_errors_sync(
                super().start_local_activity,
                input,
            )
    
        # @offload_to_blob_store
        async def start_child_workflow(self, input: StartChildWorkflowInput) -> ChildWorkflowHandle:
            input.args = [offload_if_large(arg) for arg in input.args]
            return await handle_execution_with_errors(
                super().start_child_workflow,
                input,
            )
    
    
    class CustomInterceptor(Interceptor):
        """
        Main interceptor class that provides both activity and workflow interceptors.
        """
    
        def intercept_activity(
            self, next: ActivityInboundInterceptor
        ) -> ActivityInboundInterceptor:
            """
            Creates and returns a custom activity interceptor.
            """
            return CustomActivityInterceptor(super().intercept_activity(next))
    
        def workflow_interceptor_class(
            self, input: WorkflowInterceptorClassInput
        ) -> type[WorkflowInboundInterceptor] | None:
            """
            Returns the custom workflow interceptor class.
            """
            return CustomWorkflowInterceptor
    
    
    class CustomClientInterceptor(ClientInterceptor):
        """
        Custom interceptor for Temporal.
        """
    
        def intercept_client(self, next: OutboundInterceptor) -> OutboundInterceptor:
            return CustomOutboundInterceptor(super().intercept_client(next))
    
    
    class CustomOutboundInterceptor(OutboundInterceptor):
        """
        Custom outbound interceptor for Temporal workflows.
        """
    
        # @offload_to_blob_store
        async def start_workflow(self, input: StartWorkflowInput) -> WorkflowHandle[Any, Any]:
            """
            interceptor for outbound workflow calls
            """
            input.args = [offload_if_large(arg) for arg in input.args]
    Error Handling

    The handle_execution_with_errors function is used throughout but its implementation is not visible in the diff. Ensure it properly handles all potential Temporal workflow errors and edge cases.

            return await handle_execution_with_errors(
                super().execute_workflow,
                input,
            )
    
    
    class CustomWorkflowOutboundInterceptor(WorkflowOutboundInterceptor):
        """
        Custom outbound interceptor for Temporal workflows.
        """
    
        # @offload_to_blob_store
        def start_activity(self, input: StartActivityInput) -> ActivityHandle:
            input.args = [offload_if_large(arg) for arg in input.args]
            return handle_execution_with_errors_sync(
                super().start_activity,
                input,
            )
    
        @offload_to_blob_store
        def continue_as_new(self, input: ContinueAsNewInput) -> NoReturn:
            return handle_execution_with_errors_sync(
                super().continue_as_new,
                input,
            )
    
        @offload_to_blob_store
        def start_local_activity(self, input: StartLocalActivityInput) -> ActivityHandle:
            return handle_execution_with_errors_sync(
                super().start_local_activity,
                input,
            )
    
        # @offload_to_blob_store
        async def start_child_workflow(self, input: StartChildWorkflowInput) -> ChildWorkflowHandle:
            input.args = [offload_if_large(arg) for arg in input.args]
            return await handle_execution_with_errors(
                super().start_child_workflow,
                input,
            )
    
    
    class CustomInterceptor(Interceptor):
        """
        Main interceptor class that provides both activity and workflow interceptors.
        """
    
        def intercept_activity(
            self, next: ActivityInboundInterceptor
        ) -> ActivityInboundInterceptor:
            """
            Creates and returns a custom activity interceptor.
            """
            return CustomActivityInterceptor(super().intercept_activity(next))
    
        def workflow_interceptor_class(
            self, input: WorkflowInterceptorClassInput
        ) -> type[WorkflowInboundInterceptor] | None:
            """
            Returns the custom workflow interceptor class.
            """
            return CustomWorkflowInterceptor
    
    
    class CustomClientInterceptor(ClientInterceptor):
        """
        Custom interceptor for Temporal.
        """
    
        def intercept_client(self, next: OutboundInterceptor) -> OutboundInterceptor:
            return CustomOutboundInterceptor(super().intercept_client(next))
    
    
    class CustomOutboundInterceptor(OutboundInterceptor):
        """
        Custom outbound interceptor for Temporal workflows.
        """
    
        # @offload_to_blob_store
        async def start_workflow(self, input: StartWorkflowInput) -> WorkflowHandle[Any, Any]:
            """
            interceptor for outbound workflow calls
            """
            input.args = [offload_if_large(arg) for arg in input.args]
            return await handle_execution_with_errors(
                super().start_workflow,
                input,

    Copy link
    Contributor

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Add null check for args

    The error handling in start_workflow should verify input args are not None
    before applying offload_if_large to prevent potential NullPointerException.

    agents-api/agents_api/common/interceptors.py [382-390]

     async def start_workflow(self, input: StartWorkflowInput) -> WorkflowHandle[Any, Any]:
    -    input.args = [offload_if_large(arg) for arg in input.args]
    +    if input.args is not None:
    +        input.args = [offload_if_large(arg) for arg in input.args]
         return await handle_execution_with_errors(
             super().start_workflow,
             input,
         )
    • Apply this suggestion
    Suggestion importance[1-10]: 7

    __

    Why: Adding a null check for input.args is important to prevent potential runtime errors. This defensive programming practice could prevent crashes in production.

    Medium
    General
    Remove commented-out code

    The commented-out @offload_to_blob_store decorators should be either removed or
    uncommented. Leaving commented code can lead to confusion and maintenance
    issues.

    agents-api/agents_api/common/interceptors.py [314-320]

    -# @offload_to_blob_store
     def start_activity(self, input: StartActivityInput) -> ActivityHandle:
         input.args = [offload_if_large(arg) for arg in input.args]
         return handle_execution_with_errors_sync(
             super().start_activity,
             input,
         )
    • Apply this suggestion
    Suggestion importance[1-10]: 3

    __

    Why: Removing commented-out code improves code cleanliness and readability. However, this is a minor stylistic improvement with low impact on functionality.

    Low
    • More

    Copy link
    Contributor

    @ellipsis-dev ellipsis-dev bot left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    👍 Looks good to me! Reviewed everything up to 6151c97 in 1 minute and 39 seconds

    More details
    • Looked at 107 lines of code in 2 files
    • Skipped 0 files when reviewing.
    • Skipped posting 9 drafted comments based on config settings.
    1. agents-api/agents_api/clients/temporal.py:52
    • Draft comment:
      Using CustomClientInterceptor here is fine, but ensure it composes well with any other interceptors that may be added later (e.g., TracingInterceptor).
    • Reason this comment was not posted:
      Confidence changes required: 0% <= threshold 50%
      None
    2. agents-api/agents_api/common/interceptors.py:296
    • Draft comment:
      Renaming to CustomWorkflowOutboundInterceptor is clearer, but ensure the distinction from the client interceptor class remains obvious.
    • Reason this comment was not posted:
      Confidence changes required: 0% <= threshold 50%
      None
    3. agents-api/agents_api/common/interceptors.py:315
    • Draft comment:
      The offload_to_blob_store decorator is commented out and replaced with inline offload logic. Confirm this change consistently applies the desired behavior.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    4. agents-api/agents_api/common/interceptors.py:382
    • Draft comment:
      In the new CustomOutboundInterceptor (client interceptor), the start_workflow interception applies inline offloading. Verify that handling of large payloads is consistent with workflow methods.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    5. agents-api/agents_api/clients/temporal.py:52
    • Draft comment:
      Ensure the interceptor list order is appropriate when mixing multiple interceptors (e.g. CustomClientInterceptor vs TracingInterceptor).
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    6. agents-api/agents_api/common/interceptors.py:293
    • Draft comment:
      Renaming to CustomWorkflowOutboundInterceptor is good, but consider a less ambiguous naming compared to the client-side CustomOutboundInterceptor to avoid confusion.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    7. agents-api/agents_api/common/interceptors.py:286
    • Draft comment:
      Remove the commented-out @offload_to_blob_store decorator on start_activity; if offload logic is now handled manually, clean up the code to avoid confusion.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    8. agents-api/agents_api/common/interceptors.py:333
    • Draft comment:
      Similarly, remove the commented-out @offload_to_blob_store decorator on start_child_workflow and ensure offload_if_large is applied consistently.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None
    9. agents-api/agents_api/common/interceptors.py:367
    • Draft comment:
      In CustomClientInterceptor, consider renaming the encapsulated CustomOutboundInterceptor to better differentiate it from the workflow interceptor variant, for clarity in client vs workflow outbound interception.
    • Reason this comment was not posted:
      Confidence changes required: 33% <= threshold 50%
      None

    Workflow ID: wflow_uXgtIi9RsSjJit5v


    You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

    @creatorrr
    Copy link
    Contributor

    @Ahmad-mtos can you explain the changes and the rationale behind each change? Also why the commented out offload decorators?

    @Ahmad-mtos
    Copy link
    Contributor Author

    @creatorrr changes:
    1- added a client interceptor, so that I could override the start_workflow function, allowing us to offload the input provided with the execution creation to s3 as well.
    2- the problem was that the input args were being loaded before calling the outbound interceptors (start_activity, start_child_workflow), thus the data was being sent to inbound functions like execute_workflow and execute_activity loaded instead of offloaded, and that's where we were getting the BlobSizeLimitError. Therefore, I made sure that the data was being offloaded before those functions
    3- since we're offloading the data in those functions, we don't need the offload_to_blob_store decorator, which loads the input args before calling the function, which is redundant.

    @Ahmad-mtos Ahmad-mtos merged commit 8146bdb into dev Feb 20, 2025
    9 checks passed
    @Ahmad-mtos Ahmad-mtos deleted the x/start-workflow-interceptor branch February 20, 2025 10:04
    # for free to join this conversation on GitHub. Already have an account? # to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants