Skip to content

Latest commit

 

History

History
305 lines (215 loc) · 7.86 KB

emitter.md

File metadata and controls

305 lines (215 loc) · 7.86 KB

👀 Emitter (Observability)

Table of Contents


Overview

The Emitter is a powerful event management and observability tool that allows you to track, monitor, and react to events happening within your AI agents and workflows.

This flexible event-driven mechanism providers the ability to:

  • Observe system events
  • Debug agent behaviors
  • Log and track agent interactions
  • Implement custom event handling

Note

Location within the framework: beeai_framework/emitter.

Basic usage

The following example demonstrates how the Emitter feature works.

import asyncio
import json
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    emitter: Emitter = Emitter(namespace=["base"])

    emitter.match("*.*", lambda data, event: print(f"Received event '{event.path}' with data {json.dumps(data)}"))

    await emitter.emit("start", {"id": 123})
    await emitter.emit("end", {"id": 123})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/base.py

Note

You can create your own emitter by initiating the Emitter class, but typically it's better to use or fork the root one.

Key features

Event matching

Event matching allows you to:

  • Listen to specific event types
  • Use wildcard matching
  • Handle nested events
import asyncio
import sys
import traceback

from beeai_framework.adapters.ollama.backend.chat import OllamaChatModel
from beeai_framework.backend.chat import ChatModel
from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    emitter = Emitter.root().child(namespace=["app"])
    model = OllamaChatModel()

    # Match events by a concrete name (strictly typed)
    emitter.on("update", lambda data, event: print(data, ": on update"))

    # Match all events emitted directly on the instance (not nested)
    emitter.match("*", lambda data, event: print(data, ": match all instance"))

    # Match all events (included nested)
    Emitter.root().match("*.*", lambda data, event: print(data, ": match all nested"))

    # Match events by providing a filter function
    model.emitter.match(
        lambda event: isinstance(event.creator, ChatModel), lambda data, event: print(data, ": match ChatModel")
    )

    await emitter.emit("update", "update")
    await Emitter.root().emit("root", "root")
    await model.emitter.emit("model", "model")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/matchers.py

Event piping

Event piping enables:

  • Transferring events between emitters
  • Transforming events in transit
  • Creating complex event workflows
import asyncio
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    first: Emitter = Emitter(namespace=["app"])

    first.match(
        "*.*",
        lambda data, event: print(
            f"'first' has retrieved the following event '{event.path}', isDirect: {event.source == first}"
        ),
    )

    second: Emitter = Emitter(namespace=["app", "llm"])

    second.match(
        "*.*",
        lambda data, event: print(
            f"'second' has retrieved the following event '{event.path}', isDirect: {event.source == second}"
        ),
    )

    # Propagate all events from the 'second' emitter to the 'first' emitter
    unpipe = second.pipe(first)

    await first.emit("a", {})
    await second.emit("b", {})

    print("Unpipe")
    unpipe()

    await first.emit("c", {})
    await second.emit("d", {})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/piping.py


Framework usage

Typically, you consume out-of-the-box modules that use the Emitter concept on your behalf.

Agent usage

Integrate emitters with agents to:

  • Track agent decision-making
  • Log agent interactions
  • Debug agent behaviors
import asyncio
import sys
import traceback

from beeai_framework import ReActAgent, UnconstrainedMemory
from beeai_framework.adapters.ollama.backend.chat import OllamaChatModel
from beeai_framework.errors import FrameworkError


async def main() -> None:
    agent = ReActAgent(
        llm=OllamaChatModel("llama3.1"),
        memory=UnconstrainedMemory(),
        tools=[],
    )

    # Matching events on the instance level
    agent.emitter.match("*.*", lambda data, event: None)

    # Matching events on the execution (run) level
    await agent.run("Hello agent!").observe(
        lambda emitter: emitter.match("*.*", lambda data, event: print(f"RUN LOG: received event '{event.path}'"))
    )


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/agentMatchers.py

Note

The observe method is also supported on Tools and Backend.


Advanced usage

Advanced techniques include:

  • Custom event handlers
  • Complex event filtering
  • Performance optimization
import asyncio
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    # Create emitter with a type support
    emitter = Emitter.root().child(
        namespace=["bee", "demo"],
        creator={},  # typically a class
        context={},  # custom data (propagates to the event's context property)
        group_id=None,  # optional id for grouping common events (propagates to the event's groupId property)
        trace=None,  # data to identify what emitted what and in which context (internally used by framework components)
    )

    # Listen for "start" event
    emitter.on("start", lambda data, event: print(f"Received '{event.name}' event with id '{data['id']}'"))

    # Listen for "update" event
    emitter.on(
        "update", lambda data, event: print(f"Received '{event.name}' with id '{data['id']}' and data '{data['data']}'")
    )

    await emitter.emit("start", {"id": 123})
    await emitter.emit("update", {"id": 123, "data": "Hello Bee!"})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/advanced.py


Examples