From 479cc038dd6fe7512a6ee68875e6f7321aadf3a0 Mon Sep 17 00:00:00 2001 From: Robert Kim Date: Sat, 30 Nov 2024 22:57:22 -0800 Subject: [PATCH] updated actions --- .github/workflows/publish.yml | 5 +++ README.md | 65 +++++++++++++++++++++++++++++------ pyproject.toml | 2 +- tests/test_flow.py | 18 +++++++++- 4 files changed, 78 insertions(+), 12 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 9982c08..2791fa9 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -10,6 +10,11 @@ permissions: jobs: publish: runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/lmnr/ + permissions: + id-token: write steps: - uses: actions/checkout@v4 - name: Install uv diff --git a/README.md b/README.md index 8690327..b362196 100644 --- a/README.md +++ b/README.md @@ -15,13 +15,23 @@ Results of all tasks are stored in a thread-safe `Context`. This task-based architecture makes complex workflows surprisingly simple: ✨ **What's Possible:** -- [x] Parallel task execution without explicit threading code -- [x] Self-modifying dynamic workflows and cycles -- [x] Conditional branching and control flow -- [x] Streaming of tasks execution -- [x] Automatic state management and persistence +[x] Parallel task execution without explicit threading code +[x] Self-modifying dynamic workflows and cycles +[x] Conditional branching and control flow +[x] Streaming of tasks execution +[x] Automatic state management and persistence -Flow is extremely lightweight, clearly written and has not external dependencies for the engine. +Flow is extremely lightweight, clearly written and has not external dependencies for the engine. It is designed and maintained by [Laminar](https://github.com/lmnr-ai) team. + +## Auto-instrumentation +Flow comes with auto-instrumentation for tracing using [Laminar](https://github.com/lmnr-ai/lmnr). To enable tracing, initialize the Laminar SDK with tracing enabled before using Flow. + +```python +from lmnr import Laminar +Laminar.initialize(project_api_key="...") +``` + +> Tracing is extremely useful for debugging and state reconstruction. When tracing is enabled, Flow will automatically capture the state at each step. During debugging, you can load the captured state and inspect the context. To learn more about tracing, see the [Laminar docs](https://docs.lmnr.ai). ## Getting started @@ -30,6 +40,7 @@ Flow is extremely lightweight, clearly written and has not external dependencies from concurrent.futures import ThreadPoolExecutor from lmnr_flow import Flow, TaskOutput +# thread pool executor is optional, defaults to 4 workers flow = Flow(thread_pool_executor=ThreadPoolExecutor(max_workers=4)) # Simple task that returns a result @@ -48,7 +59,7 @@ def task1(context: Context) -> TaskOutput: def task2(context: Context) -> TaskOutput: # Access results from previous tasks - t1_result = context.get("task1") # Gets "result1" + t1_result = context.get("task1") # waits for task1 to complete return TaskOutput(output="result2", next=None) flow.add_task("task1", task1) @@ -126,6 +137,40 @@ result = flow.run("greet", inputs={"user_name": "Alice"}) # Returns {"greet": "Hello Alice!"} ``` +### Dynamic Routing +```python +def router(context: Context) -> TaskOutput: + task_type = context.get("type") + routes = { + "process": ["process_task"], + "analyze": ["analyze_task"], + "report": ["report_task"] + } + return TaskOutput(output=f"routing to {task_type}", next=routes.get(task_type, [])) + +def process_task(context: Context) -> TaskOutput: + return TaskOutput(output="processed data", next=None) + +flow.add_task("router", router) +flow.add_task("process_task", process_task) +result = flow.run("router", inputs={"type": "process"}) +# Returns {"process_task": "processed data"} +``` + +## State Management + +```python +context = Context() +context.from_dict({"task1": "result1"}) + +flow = Flow(context=context) +flow.add_task("task2", lambda ctx: TaskOutput("result2", None)) +flow.run("task2") + +assert flow.context.get("task1") == "result1" # True, because it was set in the context +assert flow.context.get("task2") == "result2" +``` + ## Advanced Features - **Context Sharing**: All tasks share the same context, allowing for complex data flows @@ -134,6 +179,6 @@ result = flow.run("greet", inputs={"user_name": "Alice"}) - **Minimal Dependencies**: Core engine has zero external dependencies ## Roadmap -- [ ] Add async -- [ ] Serverless deployment -- [ ] Timetravel UI +[ ] Add async support +[ ] Serverless deployment + diff --git a/pyproject.toml b/pyproject.toml index 603fc6d..aa4f271 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "lmnr-flow" -version = "0.1.0b1" +version = "0.1.0" description = "Lightweight task engine for building AI agents" readme = "README.md" requires-python = ">=3.10" diff --git a/tests/test_flow.py b/tests/test_flow.py index 3ad106b..329e5a0 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -7,7 +7,7 @@ from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor -from src.lmnr_flow.flow import Flow, TaskOutput +from src.lmnr_flow.flow import Context, Flow, TaskOutput @pytest.fixture(autouse=True) @@ -59,6 +59,14 @@ def thread_pool(): def flow(thread_pool): return Flow(thread_pool) +@pytest.fixture +def flow_with_state(thread_pool): + context = Context() + context.from_dict({ + "task1": "result1" + }) + + return Flow(thread_pool, context) def test_simple_task_execution(flow): # Test single task that returns no next tasks @@ -291,3 +299,11 @@ def task3(ctx): result = flow.run("task1", inputs={"count": 0}) assert result == {"task3": "final"} + +def test_state_loading(flow_with_state): + # Test that state is loaded correctly + flow_with_state.add_task("task2", lambda ctx: TaskOutput("result2", None)) + flow_with_state.run("task2") + + assert flow_with_state.context.get("task1") == "result1" + assert flow_with_state.context.get("task2") == "result2"