Technical Deep Dive

A comprehensive guide to the architecture, patterns, and Python techniques used in Bedsheet Agents.

Architecture Overview

flowchart TB subgraph User["User Application"] Input["User Input"] end subgraph Agent["Supervisor / Agent"] Instruction["Instruction
(prompt)"] Memory["Memory
(history)"] Actions["ActionGroups
(tools/functions)"] end subgraph External["External Services"] LLM["LLMClient
(Anthropic API)"] Collaborators["Collaborators
(other Agents)"] end Output["Event Stream
(AsyncIterator)"] Input --> Agent Agent --> LLM Agent --> Collaborators LLM --> Output Collaborators --> Output style User fill:#dbeafe,stroke:#0969da,color:#1f2328 style Agent fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style External fill:#f3e8ff,stroke:#8250df,color:#1f2328 style Output fill:#fef3c7,stroke:#bf8700,color:#1f2328

Key Components

Component Purpose File
Agent Single agent with ReAct loop agent.py
Supervisor Multi-agent coordinator supervisor.py
ActionGroup Tool/function container action_group.py
LLMClient Protocol for LLM providers llm/base.py
Memory Conversation history storage memory/base.py
Event Streaming event types events.py

Protocols (Structural Typing)

What it is

Protocols define interfaces without inheritance. Any class that has the right methods/attributes satisfies the protocol - this is called "structural typing" or "duck typing with type hints."

Why use it

Loose coupling. You can swap implementations without changing code that uses the protocol.

The Problem Protocols Solve

Imagine you want to support multiple LLM providers (Anthropic, OpenAI, local models). Without protocols, you'd typically use inheritance:

# Traditional approach with inheritance (more rigid)
from abc import ABC, abstractmethod

class BaseLLMClient(ABC):
    @abstractmethod
    async def chat(self, messages, system): ...

# Every implementation MUST inherit from BaseLLMClient
class AnthropicClient(BaseLLMClient):  # Must explicitly inherit
    async def chat(self, messages, system):
        # ...

# Problem: What if you want to use a third-party class that
# has the right methods but doesn't inherit from your base class?
# You'd have to wrap it or modify it.

With Protocols, any class that has the right methods works automatically - no inheritance required:

# bedsheet/llm/base.py
from typing import Protocol, runtime_checkable

@runtime_checkable  # Allows isinstance() checks
class LLMClient(Protocol):
    """Protocol defining what an LLM client must implement."""

    async def chat(
        self,
        messages: list[Message],
        system: str,
        tools: list[ToolDefinition] | None = None,
    ) -> LLMResponse:
        """Send messages and get a response."""
        ...

    async def chat_stream(
        self,
        messages: list[Message],
        system: str,
        tools: list[ToolDefinition] | None = None,
    ) -> AsyncIterator[str | LLMResponse]:
        """Stream response token by token."""
        ...

Usage - Any class with these methods satisfies the protocol:

# This class doesn't inherit from LLMClient, but satisfies the protocol
class AnthropicClient:
    async def chat(self, messages, system, tools=None) -> LLMResponse:
        # Implementation...

    async def chat_stream(self, messages, system, tools=None):
        # Implementation...

# Type checking works:
client: LLMClient = AnthropicClient()  # ✓ Valid

# Runtime checking works (because of @runtime_checkable):
assert isinstance(client, LLMClient)  # ✓ True

Comparison with Abstract Base Classes

Approach Coupling isinstance() Flexibility
ABC (inheritance) Tight - must inherit Built-in Less - locked to hierarchy
Protocol Loose - duck typing Requires @runtime_checkable More - any matching class works

Dataclasses

What it is

A decorator that auto-generates __init__, __repr__, __eq__, and more from class attributes. It's Python's built-in way to create classes that are primarily containers for data.

Why use it

Less boilerplate. Instead of writing 20+ lines of repetitive code (__init__, __repr__, __eq__), you write 5 lines and get all of it for free.

The Problem: Boilerplate Code

Without dataclasses, creating a simple data container requires writing lots of repetitive code:

# Without dataclass - lots of boilerplate!
class ToolCallEvent:
    def __init__(self, tool_name: str, tool_input: dict, call_id: str):
        self.tool_name = tool_name      # Repeat each field 3 times
        self.tool_input = tool_input    # in the signature, assignment,
        self.call_id = call_id          # and as self.field
        self.type = "tool_call"

    def __repr__(self):
        return f"ToolCallEvent(tool_name={self.tool_name!r}, tool_input={self.tool_input!r}, call_id={self.call_id!r})"

    def __eq__(self, other):
        if not isinstance(other, ToolCallEvent):
            return NotImplemented
        return (self.tool_name == other.tool_name and
                self.tool_input == other.tool_input and
                self.call_id == other.call_id)

The Solution: @dataclass Decorator

With @dataclass, you just declare the fields and Python generates everything else:

# bedsheet/events.py
from dataclasses import dataclass, field
from typing import Literal, Any

@dataclass
class ToolCallEvent:
    """Emitted when the LLM requests a tool call."""
    tool_name: str                # These are the fields
    tool_input: dict[str, Any]    # Type hints define what each field holds
    call_id: str
    type: Literal["tool_call"] = field(default="tool_call", init=False)

# That's it! Python auto-generates __init__, __repr__, __eq__ for you.
# You can now do:
event = ToolCallEvent(tool_name="get_weather", tool_input={"city": "NYC"}, call_id="123")
print(event)  # ToolCallEvent(tool_name='get_weather', tool_input={'city': 'NYC'}, ...)

Understanding the field() Function

The field() function gives you fine-grained control over how each field behaves:

from dataclasses import dataclass, field

@dataclass
class Example:
    # Regular field - REQUIRED when creating an instance
    name: str

    # Field with default value - OPTIONAL when creating an instance
    count: int = 0

    # field(default=...) - same as above but more explicit
    status: str = field(default="pending")

    # field(init=False) - NOT included in __init__, set automatically
    # Useful for fields that should always have a fixed value
    type: str = field(default="example", init=False)

    # field(default_factory=...) - for MUTABLE defaults like lists/dicts
    # ⚠️ NEVER do this: items: list = []  (all instances would share the same list!)
    # ✓ DO this instead:
    items: list = field(default_factory=list)  # Each instance gets its own list

    # field(repr=False) - hide from string representation (good for large data)
    raw_data: bytes = field(default=b"", repr=False)
Common Mistake: Mutable Default Values

Never use items: list = [] in a dataclass! All instances would share the same list object. Always use field(default_factory=list) for lists, dicts, or any mutable type.

The __post_init__ Hook

Sometimes you need to compute a value based on other fields. Use __post_init__ - it runs right after __init__:

@dataclass
class Message:
    role: str
    content: str
    timestamp: float = field(init=False)  # Not passed in, computed automatically

    def __post_init__(self):
        # This runs after __init__, so self.role and self.content are already set
        import time
        self.timestamp = time.time()

# Usage:
msg = Message(role="user", content="Hello")
print(msg.timestamp)  # 1701234567.89 (automatically set)

Literal Type for Fixed Values

Literal["tool_call"] means "this field can ONLY be the string 'tool_call'". It's used for type discrimination:

from typing import Literal

@dataclass
class ToolCallEvent:
    type: Literal["tool_call"] = field(default="tool_call", init=False)

@dataclass
class CompletionEvent:
    type: Literal["completion"] = field(default="completion", init=False)

# Now type checkers know: if event.type == "tool_call", it's a ToolCallEvent
# This pattern is called a "discriminated union" or "tagged union"

Type Hints & Union Types

What it is

Type hints are annotations that tell Python (and developers) what types of values are expected. They don't affect runtime behavior but enable IDE autocompletion, catch bugs before running, and serve as documentation.

Basic Type Hints

Type hints go after a colon for variables and parameters, and after -> for return types:

# Variable with type hint
name: str = "Alice"
count: int = 42
prices: list[float] = [9.99, 19.99, 29.99]

# Function with type hints
def greet(name: str, times: int = 1) -> str:
    return f"Hello, {name}! " * times

# The -> str means "this function returns a string"

The | Operator: Union Types (Python 3.10+)

When a value can be one of several types, use | (pipe) to combine them:

# This function accepts either a string OR an integer
def process(value: str | int) -> str:
    if isinstance(value, str):
        return value.upper()
    else:
        return str(value * 2)

process("hello")  # "HELLO"
process(21)       # "42"

# Optional values: can be the type OR None
def find_user(id: int) -> User | None:
    # Returns a User if found, None if not
    ...

# Before Python 3.10, you had to use Union:
from typing import Union, Optional
def old_style(value: Union[str, int]) -> Optional[str]:  # Same as str | None
    ...

Generic Types: list[...], dict[...], etc.

Container types can specify what they contain using square brackets:

# A list that contains strings
names: list[str] = ["Alice", "Bob"]

# A dictionary with string keys and integer values
scores: dict[str, int] = {"Alice": 95, "Bob": 87}

# A list of dictionaries (common in API responses)
users: list[dict[str, Any]] = [
    {"name": "Alice", "age": 30},
    {"name": "Bob", "age": 25},
]

# Any means "any type" - use when type varies or is unknown
from typing import Any
data: Any = get_unknown_data()

Literal Types: Exact Values Only

Literal restricts a value to specific exact values - not just a type:

from typing import Literal

# This can ONLY be "supervisor" or "router", nothing else
mode: Literal["supervisor", "router"] = "supervisor"

mode = "supervisor"  # ✓ OK
mode = "router"      # ✓ OK
mode = "other"       # ✗ Type error! "other" is not allowed

# Useful for configuration options, states, modes
def set_log_level(level: Literal["debug", "info", "warn", "error"]) -> None:
    ...

Union Types for Events

In Bedsheet, Event is a Union of all possible event types. This lets you handle different events with type safety:

# bedsheet/events.py
from typing import Union

# Event can be ANY of these types
Event = Union[
    ThinkingEvent,
    TextTokenEvent,
    ToolCallEvent,
    ToolResultEvent,
    CompletionEvent,
    ErrorEvent,
    DelegationEvent,
    CollaboratorStartEvent,
    CollaboratorEvent,
    CollaboratorCompleteEvent,
]

# When you receive an Event, use isinstance to check which type it is
async for event in agent.invoke(...):
    if isinstance(event, ToolCallEvent):
        # Inside this block, Python KNOWS event is a ToolCallEvent
        # So event.tool_name and event.tool_input are available
        print(f"Tool: {event.tool_name}")

    elif isinstance(event, TextTokenEvent):
        # Here, event is definitely a TextTokenEvent
        print(event.token, end="")  # Print streaming tokens

    elif isinstance(event, CompletionEvent):
        # Here, event is definitely a CompletionEvent
        print(f"Final: {event.response}")

Pattern Matching (Python 3.10+)

An elegant alternative to isinstance chains. The match statement can destructure dataclasses:

# Instead of multiple if/elif/isinstance checks:
match event:
    case ToolCallEvent(tool_name=name, tool_input=args):
        # Extracts tool_name into 'name', tool_input into 'args'
        print(f"Calling {name} with {args}")

    case TextTokenEvent(token=t):
        print(t, end="")

    case CompletionEvent(response=text):
        print(f"Done: {text}")

    case _:
        pass  # The underscore matches anything else

Async/Await Basics

What it is

A way to write code that can pause while waiting for slow operations (like API calls) and do other work in the meantime. It's built into Python and doesn't require threads.

Key insight

Async is about concurrency (interleaving tasks), not parallelism (simultaneous execution). Think of it like a chef who starts boiling water, then preps vegetables while waiting, rather than staring at the pot.

The Problem: Waiting is Wasteful

Without async, when you make an API call, your program just waits:

# Synchronous (blocking) code - wastes time waiting
import requests

def get_weather():
    response = requests.get("https://api.weather.com")  # Program WAITS here
    return response.json()                              # Does nothing else

def get_news():
    response = requests.get("https://api.news.com")     # Program WAITS here too
    return response.json()

# These run one after another - if each takes 2 seconds, total = 4 seconds
weather = get_weather()  # Wait 2 seconds...
news = get_news()        # Wait 2 more seconds...

The Solution: async/await

With async, the program can do other things while waiting:

import asyncio
import aiohttp  # Async HTTP library

# 'async def' makes this a coroutine (an async function)
async def get_weather():
    async with aiohttp.ClientSession() as session:
        # 'await' says "pause here, let other code run while waiting"
        response = await session.get("https://api.weather.com")
        return await response.json()

async def get_news():
    async with aiohttp.ClientSession() as session:
        response = await session.get("https://api.news.com")
        return await response.json()

async def main():
    # asyncio.gather runs both at the "same time"
    # If each takes 2 seconds, total = ~2 seconds (not 4!)
    weather, news = await asyncio.gather(
        get_weather(),
        get_news(),
    )
    print(weather, news)

# This is how you run async code from regular Python
asyncio.run(main())

Key Concepts Explained

# 1. async def - defines a COROUTINE (async function)
async def my_function():
    ...

# 2. await - PAUSES the coroutine until the operation completes
#    While paused, other coroutines can run
result = await some_async_operation()

# 3. You can ONLY use 'await' inside an 'async def' function
def regular_function():
    await something()  # ✗ SyntaxError!

async def async_function():
    await something()  # ✓ OK

# 4. asyncio.run() - starts the async event loop from regular code
asyncio.run(main())  # Entry point for async code

# 5. asyncio.gather() - runs multiple coroutines concurrently
results = await asyncio.gather(task1(), task2(), task3())
Common Mistake

Calling an async function without await doesn't run it - it just creates a coroutine object. Always use await or asyncio.gather().

# Wrong - this does nothing!
get_weather()  # Returns coroutine object, doesn't execute

# Right
await get_weather()  # Actually runs the function

Visual: Sequential vs Concurrent

gantt title Sequential vs Concurrent Execution dateFormat X axisFormat %s section Sequential Task1 :a1, 0, 10 Task2 :a2, 10, 20 section Concurrent Task1 :b1, 0, 10 Task2 :b2, 0, 10

Sequential: 20 units total | Concurrent: 10 units total (half the time!)

AsyncIterator & Streaming

What it is

A way to produce values one at a time, where each value might require waiting (like getting data from an API). Instead of returning all results at once, you "yield" them as they become available.

Why use it

Perfect for streaming data - like showing LLM responses word-by-word as they arrive instead of waiting for the complete response.

Regular vs Async Iteration

First, let's understand regular iteration:

# Regular iterator - uses 'for'
for item in [1, 2, 3]:
    print(item)

# Regular generator - uses 'yield' to produce values one at a time
def count_up(n):
    for i in range(n):
        yield i  # Produces values one at a time

for num in count_up(5):
    print(num)  # 0, 1, 2, 3, 4

Async iteration is the same concept, but each step can involve waiting:

# Async generator - uses 'async def' + 'yield'
async def fetch_pages(urls):
    for url in urls:
        response = await http.get(url)  # Wait for each page
        yield response.text             # Then yield it

# Async iteration - uses 'async for'
async for page in fetch_pages(["url1", "url2", "url3"]):
    print(page)  # Processes each page as it arrives

How Bedsheet Uses AsyncIterator

The agent's invoke() method is an async generator that yields events as they happen:

# bedsheet/agent.py
from typing import AsyncIterator

class Agent:
    async def invoke(
        self,
        session_id: str,
        input_text: str,
        stream: bool = False,
    ) -> AsyncIterator[Event]:  # Return type: yields Event objects one at a time
        """Invoke agent, yielding events as they occur."""

        # When streaming, yield each token as it arrives
        if stream:
            async for token in self.model_client.chat_stream(...):
                if isinstance(token, str):
                    yield TextTokenEvent(token=token)  # Yield immediately!

        # Yield tool-related events
        yield ToolCallEvent(...)   # "I'm about to call a tool"
        result = await self.execute_tool(...)  # Actually call the tool
        yield ToolResultEvent(...) # "Here's the result"

        # Final response
        yield CompletionEvent(...)

# The caller processes events AS THEY ARRIVE - no waiting for everything
async for event in agent.invoke("session", "hello", stream=True):
    if isinstance(event, TextTokenEvent):
        print(event.token, end="")  # Print each word as it arrives!

Streaming from Claude API

Here's how we stream tokens from Claude's API:

# bedsheet/llm/anthropic.py
async def chat_stream(self, messages, system, tools=None) -> AsyncIterator[str | LLMResponse]:
    """Stream tokens from Claude, yielding each word/character as it arrives."""

    # Anthropic SDK provides 'messages.stream()' for streaming responses
    async with self._client.messages.stream(**kwargs) as stream:

        # stream.text_stream yields each token (word or part of word) as it arrives
        async for text in stream.text_stream:
            yield text  # Immediately yield to caller - don't wait!
            # Example yields: "Hello", " ", "world", "!", " ", "How", " ", "can", ...

        # After all tokens are streamed, get the complete message
        # (needed for tool calls which aren't streamed)
        final = await stream.get_final_message()
        yield self._parse_response(final)  # Yield the final structured response
Real-World Effect

This is why ChatGPT and Claude show responses word-by-word instead of making you wait 5 seconds for the complete answer. Each token is displayed the moment it arrives from the API.

Parallel Execution with asyncio.gather

What it is

Run multiple coroutines concurrently and wait for all to complete.

Key pattern in Bedsheet

Parallel tool execution and parallel agent delegation.

# bedsheet/supervisor.py - Parallel delegation
async def _handle_parallel_delegation(self, delegations, session_id, stream):
    """Execute multiple delegations in parallel."""

    async def run_delegation(d):
        """Wrapper to run one delegation and collect its events."""
        agent_name = d["agent_name"]
        task = d["task"]
        events = []
        async for event in self._execute_single_delegation(agent_name, task, session_id, stream=stream):
            events.append(event)
        return agent_name, events

    # Create tasks for all delegations
    tasks = [run_delegation(d) for d in delegations]

    # Run ALL tasks concurrently, wait for ALL to complete
    results = await asyncio.gather(*tasks)

    # results = [(agent1, events1), (agent2, events2), ...]
    return results

Visual: Parallel Delegation

gantt title Parallel Delegation Comparison dateFormat X axisFormat %s section Sequential MarketAnalyst :a1, 0, 10 NewsResearcher :a2, 10, 20 section Parallel (asyncio.gather) MarketAnalyst :b1, 0, 10 NewsResearcher :b2, 0, 10

Sequential: 20s | Parallel: 10s (half the time!)

Parallel tool execution:

# bedsheet/agent.py
async def _execute_tools_parallel(self, tool_calls: list[ToolCall]) -> list[ToolResult]:
    """Execute multiple tool calls concurrently."""

    async def execute_one(tc: ToolCall) -> ToolResult:
        try:
            result = await self._call_tool(tc.name, tc.input)
            return ToolResult(call_id=tc.id, result=result)
        except Exception as e:
            return ToolResult(call_id=tc.id, error=str(e))

    # All tools run at the same time
    results = await asyncio.gather(*[execute_one(tc) for tc in tool_calls])
    return results

The @action Decorator

What it is

A decorator is a function that wraps another function to add behavior or register it somewhere. The @action decorator registers functions as "tools" that the LLM can call.

Why use it

Clean, declarative way to define tools. Just add @action above any function and it becomes available to the AI agent.

Understanding Decorators Step by Step

A decorator is just a function that takes a function and returns a function. The @ syntax is shorthand:

# This decorator syntax...
@my_decorator
def my_function():
    pass

# ...is exactly equivalent to this:
def my_function():
    pass
my_function = my_decorator(my_function)

# The decorator receives the function and can:
# 1. Modify it
# 2. Wrap it with extra behavior
# 3. Register it somewhere
# 4. Replace it entirely

A Simple Decorator Example

# This decorator logs when a function is called
def log_calls(fn):
    def wrapper(*args, **kwargs):
        print(f"Calling {fn.__name__}...")
        result = fn(*args, **kwargs)
        print(f"{fn.__name__} returned {result}")
        return result
    return wrapper

@log_calls
def add(a, b):
    return a + b

add(2, 3)
# Output:
# Calling add...
# add returned 5

Decorators WITH Arguments (Two Levels)

When a decorator takes arguments like @action(name="...", description="..."), there's an extra level of nesting:

# @decorator_with_args("hello") is evaluated FIRST
# It returns the actual decorator function

def decorator_with_args(message):
    # This outer function receives the decorator arguments

    def actual_decorator(fn):
        # This inner function receives the function to decorate

        def wrapper(*args, **kwargs):
            print(message)  # Uses the argument from outer function
            return fn(*args, **kwargs)
        return wrapper

    return actual_decorator  # Return the decorator

# Usage:
@decorator_with_args("Hello!")
def greet(name):
    return f"Hi, {name}"

# What happens:
# 1. decorator_with_args("Hello!") is called -> returns actual_decorator
# 2. actual_decorator(greet) is called -> returns wrapper
# 3. greet now refers to wrapper

How @action Works in Bedsheet

# bedsheet/action_group.py
class ActionGroup:
    def __init__(self, name: str):
        self.name = name
        self.actions: dict[str, Action] = {}  # Store registered actions

    def action(self, name: str, description: str, parameters: dict | None = None):
        """Decorator factory - returns the actual decorator."""

        def decorator(fn: Callable) -> Callable:
            # Infer JSON schema from type hints if not provided
            schema = parameters if parameters is not None else generate_schema(fn)

            # Register this function in our actions dictionary
            self.actions[name] = Action(
                name=name,
                description=description,
                parameters=schema,
                handler=fn,  # Store reference to the actual function
            )

            return fn  # Return the original function unchanged

        return decorator

Using @action to Define Tools

# Create a group to hold related tools
tools = ActionGroup(name="MarketTools")

# Register a function as a tool the LLM can call
@tools.action(name="get_stock_data", description="Get stock price and metrics")
async def get_stock_data(symbol: str) -> dict:
    """Fetch stock data from API."""
    return {"symbol": symbol, "price": 100.0, "change": "+2.5%"}

# What happened step by step:
# 1. tools.action(name="get_stock_data", description="...") is called
#    -> Returns the 'decorator' function
# 2. decorator(get_stock_data) is called
#    -> Extracts type hints from the function (symbol: str, returns dict)
#    -> Creates an Action object with name, description, schema, and handler
#    -> Stores it in tools.actions["get_stock_data"]
#    -> Returns the original function (unchanged)
# 3. get_stock_data can still be called normally

# Now the agent can use this tool:
agent.add_action_group(tools)
# The LLM sees: "get_stock_data: Get stock price and metrics. Args: symbol (string)"
The Magic

By just adding @tools.action(...), your function automatically becomes a tool the AI can use. The decorator extracts the parameter types from your type hints, so you don't have to write JSON schemas manually.

Schema Inference from Type Hints

What it is

A technique that reads your Python function's parameter types and automatically generates a JSON Schema that tells the LLM what arguments the function accepts.

Why it matters

LLMs need to know what parameters a tool accepts. Instead of manually writing JSON schemas, Bedsheet extracts this information from your type hints automatically.

The Problem: LLMs Need Schemas

When you give an LLM access to tools, it needs to know exactly what arguments each tool accepts. This is typically done with JSON Schema:

# Without schema inference, you'd write this manually:
tool_schema = {
    "name": "search_news",
    "description": "Search for news articles",
    "input_schema": {
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Search query"},
            "limit": {"type": "integer", "description": "Max results"}
        },
        "required": ["query"]
    }
}
# This is tedious and error-prone - you repeat information already in your function!

The Solution: Use inspect and get_type_hints

Python's standard library lets you examine functions at runtime:

import inspect
from typing import get_type_hints

# Your function with type hints
async def search_news(query: str, limit: int = 10) -> dict:
    """Search for news articles."""
    pass

# inspect.signature() gives you parameter information
sig = inspect.signature(search_news)
for name, param in sig.parameters.items():
    print(f"{name}: default={param.default}")
# Output:
# query: default=  (no default = required)
# limit: default=10       (has default = optional)

# get_type_hints() gives you the type annotations
hints = get_type_hints(search_news)
print(hints)
# Output: {'query': , 'limit': , 'return': }

How Bedsheet Generates Schemas

# bedsheet/action_group.py
import inspect
from typing import get_type_hints

def generate_schema(fn: Callable) -> dict:
    """Generate JSON Schema from function type hints."""

    hints = get_type_hints(fn)      # Get {'param_name': type, ...}
    sig = inspect.signature(fn)     # Get signature with defaults

    properties = {}
    required = []

    for param_name, param in sig.parameters.items():
        if param_name == "return":
            continue  # Skip return type

        param_type = hints.get(param_name, str)  # Default to string

        # Map Python types to JSON Schema types
        type_mapping = {
            str: "string",
            int: "integer",
            float: "number",
            bool: "boolean",
            list: "array",
            dict: "object",
        }

        json_type = type_mapping.get(param_type, "string")
        properties[param_name] = {"type": json_type}

        # If parameter has no default value, it's required
        if param.default is inspect.Parameter.empty:
            required.append(param_name)

    return {
        "type": "object",
        "properties": properties,
        "required": required,
    }

Example: From Function to Schema

# Your function
async def search_news(query: str, limit: int = 10) -> dict:
    """Search news articles by query."""
    pass

# Bedsheet automatically generates this schema:
schema = generate_schema(search_news)

# Result - this is sent to the LLM:
{
    "type": "object",
    "properties": {
        "query": {"type": "string"},
        "limit": {"type": "integer"}
    },
    "required": ["query"]  # limit has a default, so it's optional
}

# The LLM now knows:
# - "query" is required and must be a string
# - "limit" is optional and must be an integer
Best Practice

Always add type hints to your tool functions. Not only does it enable schema inference, it also makes your code self-documenting and helps catch bugs with type checkers like mypy.

Event Types

All event types in the system:

# bedsheet/events.py

@dataclass
class ThinkingEvent:
    """LLM is thinking (extended thinking mode)."""
    content: str
    type: Literal["thinking"] = field(default="thinking", init=False)

@dataclass
class TextTokenEvent:
    """A token arrived from streaming LLM response."""
    token: str
    type: Literal["text_token"] = field(default="text_token", init=False)

@dataclass
class ToolCallEvent:
    """LLM wants to call a tool."""
    tool_name: str
    tool_input: dict[str, Any]
    call_id: str
    type: Literal["tool_call"] = field(default="tool_call", init=False)

@dataclass
class ToolResultEvent:
    """Tool execution completed."""
    call_id: str
    result: Any
    error: str | None = None
    type: Literal["tool_result"] = field(default="tool_result", init=False)

@dataclass
class CompletionEvent:
    """Agent produced final response."""
    response: str
    type: Literal["completion"] = field(default="completion", init=False)

@dataclass
class ErrorEvent:
    """An error occurred."""
    error: str
    recoverable: bool = False
    type: Literal["error"] = field(default="error", init=False)

@dataclass
class DelegationEvent:
    """Supervisor is delegating to agent(s)."""
    delegations: list[dict]  # [{"agent_name": "X", "task": "Y"}, ...]
    type: Literal["delegation"] = field(default="delegation", init=False)

@dataclass
class CollaboratorStartEvent:
    """A collaborator agent is starting."""
    agent_name: str
    task: str
    type: Literal["collaborator_start"] = field(default="collaborator_start", init=False)

@dataclass
class CollaboratorEvent:
    """Wraps any event from a collaborator."""
    agent_name: str
    inner_event: Event  # The wrapped event
    type: Literal["collaborator"] = field(default="collaborator", init=False)

@dataclass
class CollaboratorCompleteEvent:
    """A collaborator agent finished."""
    agent_name: str
    response: str
    type: Literal["collaborator_complete"] = field(default="collaborator_complete", init=False)

@dataclass
class RoutingEvent:
    """Router mode: supervisor picked an agent."""
    agent_name: str
    task: str
    type: Literal["routing"] = field(default="routing", init=False)

Event Flow

Single Agent Flow:

flowchart TD subgraph Agent["Agent.invoke()"] direction TB UI[User Input] --> LLM[LLM Call] LLM --> |streaming| STREAM["yield TextTokenEvent('Hello')
yield TextTokenEvent(' world')
..."] LLM --> |tool use| TOOLS[Tool calls requested] STREAM --> COMPLETE TOOLS --> TC1["Tool Call 1
(async)"] TOOLS --> TC2["Tool Call 2
(async)"] TC1 --> TCE1[yield ToolCallEvent] TC2 --> TCE2[yield ToolCallEvent] TCE1 --> TRE1[yield ToolResultEvent] TCE2 --> TRE2[yield ToolResultEvent] TRE1 --> NEXT[Next LLM Call
loop back] TRE2 --> NEXT NEXT --> LLM NEXT --> COMPLETE[yield CompletionEvent] end style Agent fill:#e0f2fe,stroke:#0284c7,color:#1f2328 style LLM fill:#dbeafe,stroke:#0969da,color:#1f2328 style STREAM fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style TOOLS fill:#fef3c7,stroke:#bf8700,color:#1f2328 style TC1 fill:#fef3c7,stroke:#bf8700,color:#1f2328 style TC2 fill:#fef3c7,stroke:#bf8700,color:#1f2328 style COMPLETE fill:#dcfce7,stroke:#1a7f37,color:#1f2328

Supervisor Flow with Parallel Delegation:

flowchart TD subgraph Supervisor["Supervisor.invoke()"] direction TB UI[User Input] --> LLM1["LLM Call
'Delegate to MarketAnalyst AND NewsResearcher'"] LLM1 --> DEL[yield DelegationEvent] DEL --> MA["MarketAnalyst
(parallel)"] DEL --> NR["NewsResearcher
(parallel)"] MA --> CSE1[yield CollaboratorStartEvent] NR --> CSE2[yield CollaboratorStartEvent] subgraph AGENT1["Agent.invoke()"] direction TB A1_TTE[TextTokenEvent] A1_TCE[ToolCallEvent] A1_TRE[ToolResultEvent] A1_CE[CompletionEvent] end subgraph AGENT2["Agent.invoke()"] direction TB A2_TTE[TextTokenEvent] A2_TCE[ToolCallEvent] A2_TRE[ToolResultEvent] A2_CE[CompletionEvent] end CSE1 --> AGENT1 CSE2 --> AGENT2 AGENT1 --> CCE["yield CollaboratorCompleteEvent
(for each)"] AGENT2 --> CCE CCE --> LLM2["Supervisor LLM Call
(synthesize collaborator results)"] LLM2 --> STREAM[yield TextTokenEvent
streaming] STREAM --> FINAL[yield CompletionEvent
final] end style Supervisor fill:#e0f2fe,stroke:#0284c7,color:#1f2328 style LLM1 fill:#dbeafe,stroke:#0969da,color:#1f2328 style LLM2 fill:#dbeafe,stroke:#0969da,color:#1f2328 style DEL fill:#fef3c7,stroke:#bf8700,color:#1f2328 style MA fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style NR fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style AGENT1 fill:#f0fdf4,stroke:#1a7f37,color:#1f2328 style AGENT2 fill:#f0fdf4,stroke:#1a7f37,color:#1f2328 style CCE fill:#fef3c7,stroke:#bf8700,color:#1f2328 style FINAL fill:#dcfce7,stroke:#1a7f37,color:#1f2328

Supervisor Pattern

Supervisor extends Agent:

# bedsheet/supervisor.py
class Supervisor(Agent):
    """An agent that can coordinate other agents."""

    def __init__(
        self,
        name: str,
        instruction: str,
        model_client: LLMClient,
        collaborators: list[Agent],  # Child agents
        collaboration_mode: Literal["supervisor", "router"] = "supervisor",
        **kwargs,
    ):
        super().__init__(name=name, instruction=instruction, model_client=model_client, **kwargs)

        # Store collaborators by name for lookup
        self.collaborators = {agent.name: agent for agent in collaborators}
        self.collaboration_mode = collaboration_mode

        # Register built-in delegate tool
        self._register_delegate_action()

Two collaboration modes:

Mode Behavior Use Case
supervisor Delegates, collects results, synthesizes Complex analysis needing multiple perspectives
router Picks one agent, hands off entirely Simple routing to specialists

The delegate tool:

def _register_delegate_action(self):
    """Register the built-in delegate action."""

    delegate_group = ActionGroup(name="DelegateTools")

    @delegate_group.action(
        name="delegate",
        description="Delegate tasks to collaborator agents",
        parameters={
            "type": "object",
            "properties": {
                "delegations": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "agent_name": {"type": "string"},
                            "task": {"type": "string"},
                        },
                        "required": ["agent_name", "task"],
                    },
                },
            },
            "required": ["delegations"],
        },
    )
    async def delegate(delegations: list) -> str:
        # This is a placeholder - actual delegation handled specially
        return "Delegation handled"

    self.add_action_group(delegate_group)

Parallel Delegation

How parallel delegation works:

# bedsheet/supervisor.py

async def _execute_single_delegation(
    self,
    agent_name: str,
    task: str,
    session_id: str,
    stream: bool = False,
) -> AsyncIterator[Event]:
    """Execute one delegation and yield its events."""

    collaborator = self.collaborators.get(agent_name)
    if collaborator is None:
        yield ErrorEvent(error=f"Unknown agent: {agent_name}")
        return

    yield CollaboratorStartEvent(agent_name=agent_name, task=task)

    # Invoke the collaborator, wrapping all its events
    result = ""
    async for event in collaborator.invoke(
        session_id=f"{session_id}:{agent_name}",
        input_text=task,
        stream=stream,
    ):
        # Wrap every event from the collaborator
        yield CollaboratorEvent(agent_name=agent_name, inner_event=event)

        if isinstance(event, CompletionEvent):
            result = event.response

    yield CollaboratorCompleteEvent(agent_name=agent_name, response=result)


async def _handle_parallel_delegations(
    self,
    delegations: list[dict],
    session_id: str,
    stream: bool,
) -> list[tuple[str, list[Event]]]:
    """Execute multiple delegations in parallel."""

    async def run_one(d: dict) -> tuple[str, list[Event]]:
        events = []
        async for event in self._execute_single_delegation(
            d["agent_name"], d["task"], session_id, stream
        ):
            events.append(event)
        return d["agent_name"], events

    # asyncio.gather runs all delegations concurrently
    results = await asyncio.gather(*[run_one(d) for d in delegations])
    return results

Streaming vs Non-Streaming

Non-streaming (original)
# Wait for complete response
response = await self._client.messages.create(
    model=self.model,
    max_tokens=self.max_tokens,
    system=system,
    messages=messages,
)

return self._parse_response(response)
Streaming (new)
# Stream tokens as they arrive
async with self._client.messages.stream(
    model=self.model,
    max_tokens=self.max_tokens,
    system=system,
    messages=messages,
) as stream:
    async for text in stream.text_stream:
        yield text  # Each token

    final = await stream.get_final_message()
    yield self._parse_response(final)

Consumption in Agent:

async def invoke(self, session_id, input_text, stream=False) -> AsyncIterator[Event]:
    # ... setup ...

    if stream and hasattr(self.model_client, 'chat_stream'):
        # Streaming path
        response = None
        async for chunk in self.model_client.chat_stream(messages, system, tools):
            if isinstance(chunk, str):
                yield TextTokenEvent(token=chunk)  # Emit each token
            else:
                response = chunk  # Final LLMResponse
    else:
        # Non-streaming path
        response = await self.model_client.chat(messages, system, tools)

    # Continue with tool handling using response...

Tool Calling

How Claude tool calling works
  1. You provide tool definitions in the API request
  2. Claude responds with tool_use blocks if it wants to call tools
  3. You execute the tools and send results back
  4. Claude continues with more tool calls or a text response
# Request to Claude includes tools:
{
    "tools": [
        {
            "name": "get_stock_data",
            "description": "Get stock price and metrics",
            "input_schema": {
                "type": "object",
                "properties": {
                    "symbol": {"type": "string"}
                },
                "required": ["symbol"]
            }
        }
    ]
}

# Claude's response when it wants to use tools:
{
    "content": [
        {
            "type": "tool_use",
            "id": "call_123",
            "name": "get_stock_data",
            "input": {"symbol": "NVDA"}
        }
    ],
    "stop_reason": "tool_use"
}

# You execute the tool and send result back:
{
    "role": "user",
    "content": [
        {
            "type": "tool_result",
            "tool_use_id": "call_123",
            "content": "{\"symbol\": \"NVDA\", \"price\": 875.50}"
        }
    ]
}

Bedsheet's tool execution loop:

# bedsheet/agent.py (simplified)
async def invoke(self, session_id, input_text, stream=False):
    # Add user message to memory
    await self.memory.add_message(session_id, Message(role="user", content=input_text))

    for iteration in range(self.max_iterations):
        messages = await self.memory.get_messages(session_id)
        tools = self._get_tool_definitions()

        # Call LLM
        response = await self.model_client.chat(messages, system_prompt, tools)

        if response.text and not response.tool_calls:
            # Final text response - we're done
            yield CompletionEvent(response=response.text)
            return

        if response.tool_calls:
            # Execute all tool calls in parallel
            for tc in response.tool_calls:
                yield ToolCallEvent(tool_name=tc.name, tool_input=tc.input, call_id=tc.id)

            results = await asyncio.gather(*[
                self._execute_tool(tc) for tc in response.tool_calls
            ])

            for result in results:
                yield ToolResultEvent(call_id=result.call_id, result=result.result)

            # Add results to memory and loop back for next LLM call
            await self._add_tool_results_to_memory(session_id, results)

Structured Outputs

What it is

A mechanism to guarantee that LLM responses conform to a specific JSON schema. Uses Anthropic's native constrained decoding - the model literally cannot generate tokens that would violate your schema.

Why use it

When you need machine-readable data, not prose. API responses, database records, UI components - anything that must be parsed reliably.

The OutputSchema Class

A simple dataclass that wraps a JSON schema with optional Pydantic model reference:

# bedsheet/llm/base.py
from dataclasses import dataclass, field
from typing import Any

@dataclass
class OutputSchema:
    """Schema for structured output.
    Can be initialized with a Pydantic model or a JSON schema dict.
    """
    schema: dict[str, Any]
    _pydantic_model: Any = field(default=None, repr=False)

    @classmethod
    def from_pydantic(cls, model: Any) -> "OutputSchema":
        """Create from a Pydantic BaseModel class."""
        schema = model.model_json_schema()
        return cls(schema=schema, _pydantic_model=model)

    @classmethod
    def from_dict(cls, schema: dict[str, Any]) -> "OutputSchema":
        """Create from a JSON schema dict."""
        return cls(schema=schema)

How It Works with Anthropic's API

When an output schema is provided, the client uses Anthropic's beta structured outputs API:

# bedsheet/llm/anthropic.py
STRUCTURED_OUTPUTS_BETA = "structured-outputs-2025-11-13"

async def chat(self, messages, system, tools=None, output_schema=None):
    kwargs = {
        "model": self.model,
        "max_tokens": self.max_tokens,
        "system": system,
        "messages": messages,
    }

    if tools:
        kwargs["tools"] = tools

    if output_schema:
        # Use beta endpoint with structured outputs
        kwargs["betas"] = [STRUCTURED_OUTPUTS_BETA]
        kwargs["output_format"] = {
            "type": "json_schema",
            "schema": output_schema.schema,
        }
        # Use beta client for structured outputs
        response = await self._client.beta.messages.create(**kwargs)
    else:
        # Standard API call
        response = await self._client.messages.create(**kwargs)

    return self._parse_response(response, output_schema)

Response Parsing with Structured Outputs

The response text is parsed as JSON and stored in parsed_output:

def _parse_response(self, response, output_schema=None) -> LLMResponse:
    text = None
    tool_calls = []
    parsed_output = None

    for block in response.content:
        if block.type == "text":
            text = block.text

            # Parse JSON if structured output was requested
            if output_schema and text:
                parsed_output = json.loads(text)

        elif block.type == "tool_use":
            tool_calls.append(ToolCall(
                id=block.id,
                name=block.name,
                input=block.input
            ))

    return LLMResponse(
        text=text,
        tool_calls=tool_calls,
        stop_reason=response.stop_reason,
        parsed_output=parsed_output,  # Validated JSON data
    )

Usage Patterns

Raw JSON Schema
# No external dependencies
schema = OutputSchema.from_dict({
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "score": {"type": "number"}
    },
    "required": ["name", "score"]
})

response = await client.chat(
    messages=[...],
    system="...",
    output_schema=schema,
)
print(response.parsed_output)
# {"name": "test", "score": 0.95}
Pydantic Model
# If using Pydantic in your project
from pydantic import BaseModel

class Result(BaseModel):
    name: str
    score: float

schema = OutputSchema.from_pydantic(Result)

response = await client.chat(
    messages=[...],
    system="...",
    output_schema=schema,
)
print(response.parsed_output)
# {"name": "test", "score": 0.95}

Key Advantages

Feature Bedsheet Other Frameworks
Works with tools Yes - tools and schema together Often mutually exclusive
Pydantic required No - optional Often mandatory
100% schema compliance Yes - constrained decoding Varies (some use post-validation)
Native API integration Yes - Anthropic beta Varies
Testing Structured Outputs

The MockLLMClient supports parsed_output in MockResponse, making it easy to test agents that use structured outputs without calling the real API.

MockLLMClient

Purpose

Test agents without making real API calls.

# bedsheet/testing.py
@dataclass
class MockResponse:
    """A pre-programmed response from the mock LLM."""
    text: str | None = None
    tool_calls: list[ToolCall] | None = None


class MockLLMClient:
    """Mock LLM client for testing."""

    def __init__(self, responses: list[MockResponse]):
        self.responses = list(responses)
        self.call_count = 0

    def _get_next_response(self) -> MockResponse:
        """Get and remove the next response from the queue."""
        if not self.responses:
            raise RuntimeError("MockLLMClient exhausted - no more responses")
        self.call_count += 1
        return self.responses.pop(0)

    async def chat(self, messages, system, tools=None) -> LLMResponse:
        """Return the next pre-programmed response."""
        response = self._get_next_response()
        return LLMResponse(
            text=response.text,
            tool_calls=response.tool_calls or [],
            stop_reason="end_turn" if response.text else "tool_use",
        )

    async def chat_stream(self, messages, system, tools=None) -> AsyncIterator[str | LLMResponse]:
        """Stream the next pre-programmed response."""
        response = self._get_next_response()

        # Yield text word by word
        if response.text:
            words = response.text.split(' ')
            for i, word in enumerate(words):
                if i > 0:
                    yield ' '
                yield word

        # Yield final response
        yield LLMResponse(
            text=response.text,
            tool_calls=response.tool_calls or [],
            stop_reason="end_turn",
        )

Async Test Fixtures

pytest-asyncio setup:

# tests/conftest.py or in test file
import pytest

# Mark all tests in file as async
pytestmark = pytest.mark.asyncio

# Or mark individual tests
@pytest.mark.asyncio
async def test_something():
    result = await some_async_function()
    assert result == expected

Usage in tests:

# tests/test_agent.py
@pytest.mark.asyncio
async def test_agent_calls_tool_and_returns_result():
    mock = MockLLMClient(responses=[
        # First response: LLM wants to call a tool
        MockResponse(tool_calls=[
            ToolCall(id="1", name="get_weather", input={"city": "NYC"})
        ]),
        # Second response: LLM synthesizes result
        MockResponse(text="The weather in NYC is sunny."),
    ])

    tools = ActionGroup(name="Weather")

    @tools.action(name="get_weather", description="Get weather")
    async def get_weather(city: str) -> str:
        return f"Sunny in {city}"

    agent = Agent(
        name="WeatherBot",
        instruction="Help with weather",
        model_client=mock,
    )
    agent.add_action_group(tools)

    events = []
    async for event in agent.invoke("test", "What's the weather in NYC?"):
        events.append(event)

    # Verify event sequence
    assert isinstance(events[0], ToolCallEvent)
    assert events[0].tool_name == "get_weather"

    assert isinstance(events[1], ToolResultEvent)
    assert "Sunny" in events[1].result

    assert isinstance(events[2], CompletionEvent)
    assert "sunny" in events[2].response.lower()

Summary

Key Patterns Recap

Pattern Where Used Why
Protocol LLMClient, Memory Loose coupling, easy to swap implementations
Dataclass All events, ToolCall, LLMResponse Clean data structures with less boilerplate
AsyncIterator invoke() methods Stream events as they happen
asyncio.gather Tool execution, parallel delegation Concurrent I/O operations
Decorator @action Register functions with metadata
Type hints Everywhere Self-documenting, IDE support, type checking

File Reference

File Purpose
agent.py Single agent with ReAct loop
supervisor.py Multi-agent coordinator
action_group.py Tool definitions and @action decorator
events.py All event dataclasses
llm/base.py LLMClient protocol and types
llm/anthropic.py Claude integration
memory/base.py Memory protocol
memory/in_memory.py Dict-based memory
testing.py MockLLMClient for tests

Further Reading