Skip to content

Commit

Permalink
Better support async flows (#2193)
Browse files Browse the repository at this point in the history
* Better support async

* Drop coroutine
  • Loading branch information
bhancockio authored Feb 24, 2025
1 parent b50772a commit 8a75847
Showing 1 changed file with 44 additions and 31 deletions.
75 changes: 44 additions & 31 deletions src/crewai/flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,41 +713,63 @@ def _restore_state(self, stored_state: Dict[str, Any]) -> None:
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")

def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""Start the flow execution.
"""
Start the flow execution in a synchronous context.
Args:
inputs: Optional dictionary containing input values and potentially a state ID to restore
This method wraps kickoff_async so that all state initialization and event
emission is handled in the asynchronous method.
"""

async def run_flow():
return await self.kickoff_async(inputs)

return asyncio.run(run_flow())

@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
# Handle state restoration if ID is provided in inputs
if inputs and "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
Start the flow execution asynchronously.
This method performs state restoration (if an 'id' is provided and persistence is available)
and updates the flow state with any additional inputs. It then emits the FlowStartedEvent,
logs the flow startup, and executes all start methods. Once completed, it emits the
FlowFinishedEvent and returns the final output.
Args:
inputs: Optional dictionary containing input values and/or a state ID for restoration.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
if inputs:
# Override the id in the state if it exists in inputs
if "id" in inputs:
if isinstance(self._state, dict):
self._state["id"] = inputs["id"]
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"])

if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
# Restore the state
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)

# Apply any additional inputs after restoration
# Update state with any additional inputs (ignoring the 'id' key)
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
if filtered_inputs:
self._initialize_state(filtered_inputs)

# Start flow execution
# Emit FlowStartedEvent and log the start of the flow.
crewai_event_bus.emit(
self,
FlowStartedEvent(
Expand All @@ -760,27 +782,18 @@ def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
)

if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)

async def run_flow():
return await self.kickoff_async()

return asyncio.run(run_flow())

@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")

# Execute all start methods concurrently.
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)

final_output = self._method_outputs[-1] if self._method_outputs else None

# Emit FlowFinishedEvent after all processing is complete.
crewai_event_bus.emit(
self,
FlowFinishedEvent(
Expand Down

0 comments on commit 8a75847

Please # to comment.