Technical Deep Dive
A comprehensive guide to the architecture, patterns, and Python techniques used in Bedsheet Agents.
Architecture Overview
(prompt)"] Memory["Memory
(history)"] Actions["ActionGroups
(tools/functions)"] end subgraph External["External Services"] LLM["LLMClient
(Anthropic API)"] Collaborators["Collaborators
(other Agents)"] end Output["Event Stream
(AsyncIterator)"] Input --> Agent Agent --> LLM Agent --> Collaborators LLM --> Output Collaborators --> Output style User fill:#dbeafe,stroke:#0969da,color:#1f2328 style Agent fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style External fill:#f3e8ff,stroke:#8250df,color:#1f2328 style Output fill:#fef3c7,stroke:#bf8700,color:#1f2328
Key Components
| Component | Purpose | File |
|---|---|---|
Agent |
Single agent with ReAct loop | agent.py |
Supervisor |
Multi-agent coordinator | supervisor.py |
ActionGroup |
Tool/function container | action_group.py |
LLMClient |
Protocol for LLM providers | llm/base.py |
Memory |
Conversation history storage | memory/base.py |
Event |
Streaming event types | events.py |
Protocols (Structural Typing)
Protocols define interfaces without inheritance. Any class that has the right methods/attributes satisfies the protocol - this is called "structural typing" or "duck typing with type hints."
Loose coupling. You can swap implementations without changing code that uses the protocol.
The Problem Protocols Solve
Imagine you want to support multiple LLM providers (Anthropic, OpenAI, local models). Without protocols, you'd typically use inheritance:
# Traditional approach with inheritance (more rigid)
from abc import ABC, abstractmethod
class BaseLLMClient(ABC):
@abstractmethod
async def chat(self, messages, system): ...
# Every implementation MUST inherit from BaseLLMClient
class AnthropicClient(BaseLLMClient): # Must explicitly inherit
async def chat(self, messages, system):
# ...
# Problem: What if you want to use a third-party class that
# has the right methods but doesn't inherit from your base class?
# You'd have to wrap it or modify it.
With Protocols, any class that has the right methods works automatically - no inheritance required:
# bedsheet/llm/base.py
from typing import Protocol, runtime_checkable
@runtime_checkable # Allows isinstance() checks
class LLMClient(Protocol):
"""Protocol defining what an LLM client must implement."""
async def chat(
self,
messages: list[Message],
system: str,
tools: list[ToolDefinition] | None = None,
) -> LLMResponse:
"""Send messages and get a response."""
...
async def chat_stream(
self,
messages: list[Message],
system: str,
tools: list[ToolDefinition] | None = None,
) -> AsyncIterator[str | LLMResponse]:
"""Stream response token by token."""
...
Usage - Any class with these methods satisfies the protocol:
# This class doesn't inherit from LLMClient, but satisfies the protocol
class AnthropicClient:
async def chat(self, messages, system, tools=None) -> LLMResponse:
# Implementation...
async def chat_stream(self, messages, system, tools=None):
# Implementation...
# Type checking works:
client: LLMClient = AnthropicClient() # ✓ Valid
# Runtime checking works (because of @runtime_checkable):
assert isinstance(client, LLMClient) # ✓ True
Comparison with Abstract Base Classes
| Approach | Coupling | isinstance() | Flexibility |
|---|---|---|---|
| ABC (inheritance) | Tight - must inherit | Built-in | Less - locked to hierarchy |
| Protocol | Loose - duck typing | Requires @runtime_checkable | More - any matching class works |
Dataclasses
A decorator that auto-generates __init__, __repr__, __eq__, and more from class attributes. It's Python's built-in way to create classes that are primarily containers for data.
Less boilerplate. Instead of writing 20+ lines of repetitive code (__init__, __repr__, __eq__), you write 5 lines and get all of it for free.
The Problem: Boilerplate Code
Without dataclasses, creating a simple data container requires writing lots of repetitive code:
# Without dataclass - lots of boilerplate!
class ToolCallEvent:
def __init__(self, tool_name: str, tool_input: dict, call_id: str):
self.tool_name = tool_name # Repeat each field 3 times
self.tool_input = tool_input # in the signature, assignment,
self.call_id = call_id # and as self.field
self.type = "tool_call"
def __repr__(self):
return f"ToolCallEvent(tool_name={self.tool_name!r}, tool_input={self.tool_input!r}, call_id={self.call_id!r})"
def __eq__(self, other):
if not isinstance(other, ToolCallEvent):
return NotImplemented
return (self.tool_name == other.tool_name and
self.tool_input == other.tool_input and
self.call_id == other.call_id)
The Solution: @dataclass Decorator
With @dataclass, you just declare the fields and Python generates everything else:
# bedsheet/events.py
from dataclasses import dataclass, field
from typing import Literal, Any
@dataclass
class ToolCallEvent:
"""Emitted when the LLM requests a tool call."""
tool_name: str # These are the fields
tool_input: dict[str, Any] # Type hints define what each field holds
call_id: str
type: Literal["tool_call"] = field(default="tool_call", init=False)
# That's it! Python auto-generates __init__, __repr__, __eq__ for you.
# You can now do:
event = ToolCallEvent(tool_name="get_weather", tool_input={"city": "NYC"}, call_id="123")
print(event) # ToolCallEvent(tool_name='get_weather', tool_input={'city': 'NYC'}, ...)
Understanding the field() Function
The field() function gives you fine-grained control over how each field behaves:
from dataclasses import dataclass, field
@dataclass
class Example:
# Regular field - REQUIRED when creating an instance
name: str
# Field with default value - OPTIONAL when creating an instance
count: int = 0
# field(default=...) - same as above but more explicit
status: str = field(default="pending")
# field(init=False) - NOT included in __init__, set automatically
# Useful for fields that should always have a fixed value
type: str = field(default="example", init=False)
# field(default_factory=...) - for MUTABLE defaults like lists/dicts
# ⚠️ NEVER do this: items: list = [] (all instances would share the same list!)
# ✓ DO this instead:
items: list = field(default_factory=list) # Each instance gets its own list
# field(repr=False) - hide from string representation (good for large data)
raw_data: bytes = field(default=b"", repr=False)
Never use items: list = [] in a dataclass! All instances would share the same list object. Always use field(default_factory=list) for lists, dicts, or any mutable type.
The __post_init__ Hook
Sometimes you need to compute a value based on other fields. Use __post_init__ - it runs right after __init__:
@dataclass
class Message:
role: str
content: str
timestamp: float = field(init=False) # Not passed in, computed automatically
def __post_init__(self):
# This runs after __init__, so self.role and self.content are already set
import time
self.timestamp = time.time()
# Usage:
msg = Message(role="user", content="Hello")
print(msg.timestamp) # 1701234567.89 (automatically set)
Literal Type for Fixed Values
Literal["tool_call"] means "this field can ONLY be the string 'tool_call'". It's used for type discrimination:
from typing import Literal
@dataclass
class ToolCallEvent:
type: Literal["tool_call"] = field(default="tool_call", init=False)
@dataclass
class CompletionEvent:
type: Literal["completion"] = field(default="completion", init=False)
# Now type checkers know: if event.type == "tool_call", it's a ToolCallEvent
# This pattern is called a "discriminated union" or "tagged union"
Type Hints & Union Types
Type hints are annotations that tell Python (and developers) what types of values are expected. They don't affect runtime behavior but enable IDE autocompletion, catch bugs before running, and serve as documentation.
Basic Type Hints
Type hints go after a colon for variables and parameters, and after -> for return types:
# Variable with type hint
name: str = "Alice"
count: int = 42
prices: list[float] = [9.99, 19.99, 29.99]
# Function with type hints
def greet(name: str, times: int = 1) -> str:
return f"Hello, {name}! " * times
# The -> str means "this function returns a string"
The | Operator: Union Types (Python 3.10+)
When a value can be one of several types, use | (pipe) to combine them:
# This function accepts either a string OR an integer
def process(value: str | int) -> str:
if isinstance(value, str):
return value.upper()
else:
return str(value * 2)
process("hello") # "HELLO"
process(21) # "42"
# Optional values: can be the type OR None
def find_user(id: int) -> User | None:
# Returns a User if found, None if not
...
# Before Python 3.10, you had to use Union:
from typing import Union, Optional
def old_style(value: Union[str, int]) -> Optional[str]: # Same as str | None
...
Generic Types: list[...], dict[...], etc.
Container types can specify what they contain using square brackets:
# A list that contains strings
names: list[str] = ["Alice", "Bob"]
# A dictionary with string keys and integer values
scores: dict[str, int] = {"Alice": 95, "Bob": 87}
# A list of dictionaries (common in API responses)
users: list[dict[str, Any]] = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
]
# Any means "any type" - use when type varies or is unknown
from typing import Any
data: Any = get_unknown_data()
Literal Types: Exact Values Only
Literal restricts a value to specific exact values - not just a type:
from typing import Literal
# This can ONLY be "supervisor" or "router", nothing else
mode: Literal["supervisor", "router"] = "supervisor"
mode = "supervisor" # ✓ OK
mode = "router" # ✓ OK
mode = "other" # ✗ Type error! "other" is not allowed
# Useful for configuration options, states, modes
def set_log_level(level: Literal["debug", "info", "warn", "error"]) -> None:
...
Union Types for Events
In Bedsheet, Event is a Union of all possible event types. This lets you handle different events with type safety:
# bedsheet/events.py
from typing import Union
# Event can be ANY of these types
Event = Union[
ThinkingEvent,
TextTokenEvent,
ToolCallEvent,
ToolResultEvent,
CompletionEvent,
ErrorEvent,
DelegationEvent,
CollaboratorStartEvent,
CollaboratorEvent,
CollaboratorCompleteEvent,
]
# When you receive an Event, use isinstance to check which type it is
async for event in agent.invoke(...):
if isinstance(event, ToolCallEvent):
# Inside this block, Python KNOWS event is a ToolCallEvent
# So event.tool_name and event.tool_input are available
print(f"Tool: {event.tool_name}")
elif isinstance(event, TextTokenEvent):
# Here, event is definitely a TextTokenEvent
print(event.token, end="") # Print streaming tokens
elif isinstance(event, CompletionEvent):
# Here, event is definitely a CompletionEvent
print(f"Final: {event.response}")
Pattern Matching (Python 3.10+)
An elegant alternative to isinstance chains. The match statement can destructure dataclasses:
# Instead of multiple if/elif/isinstance checks:
match event:
case ToolCallEvent(tool_name=name, tool_input=args):
# Extracts tool_name into 'name', tool_input into 'args'
print(f"Calling {name} with {args}")
case TextTokenEvent(token=t):
print(t, end="")
case CompletionEvent(response=text):
print(f"Done: {text}")
case _:
pass # The underscore matches anything else
Async/Await Basics
A way to write code that can pause while waiting for slow operations (like API calls) and do other work in the meantime. It's built into Python and doesn't require threads.
Async is about concurrency (interleaving tasks), not parallelism (simultaneous execution). Think of it like a chef who starts boiling water, then preps vegetables while waiting, rather than staring at the pot.
The Problem: Waiting is Wasteful
Without async, when you make an API call, your program just waits:
# Synchronous (blocking) code - wastes time waiting
import requests
def get_weather():
response = requests.get("https://api.weather.com") # Program WAITS here
return response.json() # Does nothing else
def get_news():
response = requests.get("https://api.news.com") # Program WAITS here too
return response.json()
# These run one after another - if each takes 2 seconds, total = 4 seconds
weather = get_weather() # Wait 2 seconds...
news = get_news() # Wait 2 more seconds...
The Solution: async/await
With async, the program can do other things while waiting:
import asyncio
import aiohttp # Async HTTP library
# 'async def' makes this a coroutine (an async function)
async def get_weather():
async with aiohttp.ClientSession() as session:
# 'await' says "pause here, let other code run while waiting"
response = await session.get("https://api.weather.com")
return await response.json()
async def get_news():
async with aiohttp.ClientSession() as session:
response = await session.get("https://api.news.com")
return await response.json()
async def main():
# asyncio.gather runs both at the "same time"
# If each takes 2 seconds, total = ~2 seconds (not 4!)
weather, news = await asyncio.gather(
get_weather(),
get_news(),
)
print(weather, news)
# This is how you run async code from regular Python
asyncio.run(main())
Key Concepts Explained
# 1. async def - defines a COROUTINE (async function)
async def my_function():
...
# 2. await - PAUSES the coroutine until the operation completes
# While paused, other coroutines can run
result = await some_async_operation()
# 3. You can ONLY use 'await' inside an 'async def' function
def regular_function():
await something() # ✗ SyntaxError!
async def async_function():
await something() # ✓ OK
# 4. asyncio.run() - starts the async event loop from regular code
asyncio.run(main()) # Entry point for async code
# 5. asyncio.gather() - runs multiple coroutines concurrently
results = await asyncio.gather(task1(), task2(), task3())
Calling an async function without await doesn't run it - it just creates a coroutine object. Always use await or asyncio.gather().
# Wrong - this does nothing!
get_weather() # Returns coroutine object, doesn't execute
# Right
await get_weather() # Actually runs the function
Visual: Sequential vs Concurrent
Sequential: 20 units total | Concurrent: 10 units total (half the time!)
AsyncIterator & Streaming
A way to produce values one at a time, where each value might require waiting (like getting data from an API). Instead of returning all results at once, you "yield" them as they become available.
Perfect for streaming data - like showing LLM responses word-by-word as they arrive instead of waiting for the complete response.
Regular vs Async Iteration
First, let's understand regular iteration:
# Regular iterator - uses 'for'
for item in [1, 2, 3]:
print(item)
# Regular generator - uses 'yield' to produce values one at a time
def count_up(n):
for i in range(n):
yield i # Produces values one at a time
for num in count_up(5):
print(num) # 0, 1, 2, 3, 4
Async iteration is the same concept, but each step can involve waiting:
# Async generator - uses 'async def' + 'yield'
async def fetch_pages(urls):
for url in urls:
response = await http.get(url) # Wait for each page
yield response.text # Then yield it
# Async iteration - uses 'async for'
async for page in fetch_pages(["url1", "url2", "url3"]):
print(page) # Processes each page as it arrives
How Bedsheet Uses AsyncIterator
The agent's invoke() method is an async generator that yields events as they happen:
# bedsheet/agent.py
from typing import AsyncIterator
class Agent:
async def invoke(
self,
session_id: str,
input_text: str,
stream: bool = False,
) -> AsyncIterator[Event]: # Return type: yields Event objects one at a time
"""Invoke agent, yielding events as they occur."""
# When streaming, yield each token as it arrives
if stream:
async for token in self.model_client.chat_stream(...):
if isinstance(token, str):
yield TextTokenEvent(token=token) # Yield immediately!
# Yield tool-related events
yield ToolCallEvent(...) # "I'm about to call a tool"
result = await self.execute_tool(...) # Actually call the tool
yield ToolResultEvent(...) # "Here's the result"
# Final response
yield CompletionEvent(...)
# The caller processes events AS THEY ARRIVE - no waiting for everything
async for event in agent.invoke("session", "hello", stream=True):
if isinstance(event, TextTokenEvent):
print(event.token, end="") # Print each word as it arrives!
Streaming from Claude API
Here's how we stream tokens from Claude's API:
# bedsheet/llm/anthropic.py
async def chat_stream(self, messages, system, tools=None) -> AsyncIterator[str | LLMResponse]:
"""Stream tokens from Claude, yielding each word/character as it arrives."""
# Anthropic SDK provides 'messages.stream()' for streaming responses
async with self._client.messages.stream(**kwargs) as stream:
# stream.text_stream yields each token (word or part of word) as it arrives
async for text in stream.text_stream:
yield text # Immediately yield to caller - don't wait!
# Example yields: "Hello", " ", "world", "!", " ", "How", " ", "can", ...
# After all tokens are streamed, get the complete message
# (needed for tool calls which aren't streamed)
final = await stream.get_final_message()
yield self._parse_response(final) # Yield the final structured response
This is why ChatGPT and Claude show responses word-by-word instead of making you wait 5 seconds for the complete answer. Each token is displayed the moment it arrives from the API.
Parallel Execution with asyncio.gather
Run multiple coroutines concurrently and wait for all to complete.
Parallel tool execution and parallel agent delegation.
# bedsheet/supervisor.py - Parallel delegation
async def _handle_parallel_delegation(self, delegations, session_id, stream):
"""Execute multiple delegations in parallel."""
async def run_delegation(d):
"""Wrapper to run one delegation and collect its events."""
agent_name = d["agent_name"]
task = d["task"]
events = []
async for event in self._execute_single_delegation(agent_name, task, session_id, stream=stream):
events.append(event)
return agent_name, events
# Create tasks for all delegations
tasks = [run_delegation(d) for d in delegations]
# Run ALL tasks concurrently, wait for ALL to complete
results = await asyncio.gather(*tasks)
# results = [(agent1, events1), (agent2, events2), ...]
return results
Visual: Parallel Delegation
Sequential: 20s | Parallel: 10s (half the time!)
Parallel tool execution:
# bedsheet/agent.py
async def _execute_tools_parallel(self, tool_calls: list[ToolCall]) -> list[ToolResult]:
"""Execute multiple tool calls concurrently."""
async def execute_one(tc: ToolCall) -> ToolResult:
try:
result = await self._call_tool(tc.name, tc.input)
return ToolResult(call_id=tc.id, result=result)
except Exception as e:
return ToolResult(call_id=tc.id, error=str(e))
# All tools run at the same time
results = await asyncio.gather(*[execute_one(tc) for tc in tool_calls])
return results
The @action Decorator
A decorator is a function that wraps another function to add behavior or register it somewhere. The @action decorator registers functions as "tools" that the LLM can call.
Clean, declarative way to define tools. Just add @action above any function and it becomes available to the AI agent.
Understanding Decorators Step by Step
A decorator is just a function that takes a function and returns a function. The @ syntax is shorthand:
# This decorator syntax...
@my_decorator
def my_function():
pass
# ...is exactly equivalent to this:
def my_function():
pass
my_function = my_decorator(my_function)
# The decorator receives the function and can:
# 1. Modify it
# 2. Wrap it with extra behavior
# 3. Register it somewhere
# 4. Replace it entirely
A Simple Decorator Example
# This decorator logs when a function is called
def log_calls(fn):
def wrapper(*args, **kwargs):
print(f"Calling {fn.__name__}...")
result = fn(*args, **kwargs)
print(f"{fn.__name__} returned {result}")
return result
return wrapper
@log_calls
def add(a, b):
return a + b
add(2, 3)
# Output:
# Calling add...
# add returned 5
Decorators WITH Arguments (Two Levels)
When a decorator takes arguments like @action(name="...", description="..."), there's an extra level of nesting:
# @decorator_with_args("hello") is evaluated FIRST
# It returns the actual decorator function
def decorator_with_args(message):
# This outer function receives the decorator arguments
def actual_decorator(fn):
# This inner function receives the function to decorate
def wrapper(*args, **kwargs):
print(message) # Uses the argument from outer function
return fn(*args, **kwargs)
return wrapper
return actual_decorator # Return the decorator
# Usage:
@decorator_with_args("Hello!")
def greet(name):
return f"Hi, {name}"
# What happens:
# 1. decorator_with_args("Hello!") is called -> returns actual_decorator
# 2. actual_decorator(greet) is called -> returns wrapper
# 3. greet now refers to wrapper
How @action Works in Bedsheet
# bedsheet/action_group.py
class ActionGroup:
def __init__(self, name: str):
self.name = name
self.actions: dict[str, Action] = {} # Store registered actions
def action(self, name: str, description: str, parameters: dict | None = None):
"""Decorator factory - returns the actual decorator."""
def decorator(fn: Callable) -> Callable:
# Infer JSON schema from type hints if not provided
schema = parameters if parameters is not None else generate_schema(fn)
# Register this function in our actions dictionary
self.actions[name] = Action(
name=name,
description=description,
parameters=schema,
handler=fn, # Store reference to the actual function
)
return fn # Return the original function unchanged
return decorator
Using @action to Define Tools
# Create a group to hold related tools
tools = ActionGroup(name="MarketTools")
# Register a function as a tool the LLM can call
@tools.action(name="get_stock_data", description="Get stock price and metrics")
async def get_stock_data(symbol: str) -> dict:
"""Fetch stock data from API."""
return {"symbol": symbol, "price": 100.0, "change": "+2.5%"}
# What happened step by step:
# 1. tools.action(name="get_stock_data", description="...") is called
# -> Returns the 'decorator' function
# 2. decorator(get_stock_data) is called
# -> Extracts type hints from the function (symbol: str, returns dict)
# -> Creates an Action object with name, description, schema, and handler
# -> Stores it in tools.actions["get_stock_data"]
# -> Returns the original function (unchanged)
# 3. get_stock_data can still be called normally
# Now the agent can use this tool:
agent.add_action_group(tools)
# The LLM sees: "get_stock_data: Get stock price and metrics. Args: symbol (string)"
By just adding @tools.action(...), your function automatically becomes a tool the AI can use. The decorator extracts the parameter types from your type hints, so you don't have to write JSON schemas manually.
Schema Inference from Type Hints
A technique that reads your Python function's parameter types and automatically generates a JSON Schema that tells the LLM what arguments the function accepts.
LLMs need to know what parameters a tool accepts. Instead of manually writing JSON schemas, Bedsheet extracts this information from your type hints automatically.
The Problem: LLMs Need Schemas
When you give an LLM access to tools, it needs to know exactly what arguments each tool accepts. This is typically done with JSON Schema:
# Without schema inference, you'd write this manually:
tool_schema = {
"name": "search_news",
"description": "Search for news articles",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results"}
},
"required": ["query"]
}
}
# This is tedious and error-prone - you repeat information already in your function!
The Solution: Use inspect and get_type_hints
Python's standard library lets you examine functions at runtime:
import inspect
from typing import get_type_hints
# Your function with type hints
async def search_news(query: str, limit: int = 10) -> dict:
"""Search for news articles."""
pass
# inspect.signature() gives you parameter information
sig = inspect.signature(search_news)
for name, param in sig.parameters.items():
print(f"{name}: default={param.default}")
# Output:
# query: default= (no default = required)
# limit: default=10 (has default = optional)
# get_type_hints() gives you the type annotations
hints = get_type_hints(search_news)
print(hints)
# Output: {'query': , 'limit': , 'return': }
How Bedsheet Generates Schemas
# bedsheet/action_group.py
import inspect
from typing import get_type_hints
def generate_schema(fn: Callable) -> dict:
"""Generate JSON Schema from function type hints."""
hints = get_type_hints(fn) # Get {'param_name': type, ...}
sig = inspect.signature(fn) # Get signature with defaults
properties = {}
required = []
for param_name, param in sig.parameters.items():
if param_name == "return":
continue # Skip return type
param_type = hints.get(param_name, str) # Default to string
# Map Python types to JSON Schema types
type_mapping = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
}
json_type = type_mapping.get(param_type, "string")
properties[param_name] = {"type": json_type}
# If parameter has no default value, it's required
if param.default is inspect.Parameter.empty:
required.append(param_name)
return {
"type": "object",
"properties": properties,
"required": required,
}
Example: From Function to Schema
# Your function
async def search_news(query: str, limit: int = 10) -> dict:
"""Search news articles by query."""
pass
# Bedsheet automatically generates this schema:
schema = generate_schema(search_news)
# Result - this is sent to the LLM:
{
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer"}
},
"required": ["query"] # limit has a default, so it's optional
}
# The LLM now knows:
# - "query" is required and must be a string
# - "limit" is optional and must be an integer
Always add type hints to your tool functions. Not only does it enable schema inference, it also makes your code self-documenting and helps catch bugs with type checkers like mypy.
Event Types
All event types in the system:
# bedsheet/events.py
@dataclass
class ThinkingEvent:
"""LLM is thinking (extended thinking mode)."""
content: str
type: Literal["thinking"] = field(default="thinking", init=False)
@dataclass
class TextTokenEvent:
"""A token arrived from streaming LLM response."""
token: str
type: Literal["text_token"] = field(default="text_token", init=False)
@dataclass
class ToolCallEvent:
"""LLM wants to call a tool."""
tool_name: str
tool_input: dict[str, Any]
call_id: str
type: Literal["tool_call"] = field(default="tool_call", init=False)
@dataclass
class ToolResultEvent:
"""Tool execution completed."""
call_id: str
result: Any
error: str | None = None
type: Literal["tool_result"] = field(default="tool_result", init=False)
@dataclass
class CompletionEvent:
"""Agent produced final response."""
response: str
type: Literal["completion"] = field(default="completion", init=False)
@dataclass
class ErrorEvent:
"""An error occurred."""
error: str
recoverable: bool = False
type: Literal["error"] = field(default="error", init=False)
@dataclass
class DelegationEvent:
"""Supervisor is delegating to agent(s)."""
delegations: list[dict] # [{"agent_name": "X", "task": "Y"}, ...]
type: Literal["delegation"] = field(default="delegation", init=False)
@dataclass
class CollaboratorStartEvent:
"""A collaborator agent is starting."""
agent_name: str
task: str
type: Literal["collaborator_start"] = field(default="collaborator_start", init=False)
@dataclass
class CollaboratorEvent:
"""Wraps any event from a collaborator."""
agent_name: str
inner_event: Event # The wrapped event
type: Literal["collaborator"] = field(default="collaborator", init=False)
@dataclass
class CollaboratorCompleteEvent:
"""A collaborator agent finished."""
agent_name: str
response: str
type: Literal["collaborator_complete"] = field(default="collaborator_complete", init=False)
@dataclass
class RoutingEvent:
"""Router mode: supervisor picked an agent."""
agent_name: str
task: str
type: Literal["routing"] = field(default="routing", init=False)
Event Flow
Single Agent Flow:
yield TextTokenEvent(' world')
..."] LLM --> |tool use| TOOLS[Tool calls requested] STREAM --> COMPLETE TOOLS --> TC1["Tool Call 1
(async)"] TOOLS --> TC2["Tool Call 2
(async)"] TC1 --> TCE1[yield ToolCallEvent] TC2 --> TCE2[yield ToolCallEvent] TCE1 --> TRE1[yield ToolResultEvent] TCE2 --> TRE2[yield ToolResultEvent] TRE1 --> NEXT[Next LLM Call
loop back] TRE2 --> NEXT NEXT --> LLM NEXT --> COMPLETE[yield CompletionEvent] end style Agent fill:#e0f2fe,stroke:#0284c7,color:#1f2328 style LLM fill:#dbeafe,stroke:#0969da,color:#1f2328 style STREAM fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style TOOLS fill:#fef3c7,stroke:#bf8700,color:#1f2328 style TC1 fill:#fef3c7,stroke:#bf8700,color:#1f2328 style TC2 fill:#fef3c7,stroke:#bf8700,color:#1f2328 style COMPLETE fill:#dcfce7,stroke:#1a7f37,color:#1f2328
Supervisor Flow with Parallel Delegation:
'Delegate to MarketAnalyst AND NewsResearcher'"] LLM1 --> DEL[yield DelegationEvent] DEL --> MA["MarketAnalyst
(parallel)"] DEL --> NR["NewsResearcher
(parallel)"] MA --> CSE1[yield CollaboratorStartEvent] NR --> CSE2[yield CollaboratorStartEvent] subgraph AGENT1["Agent.invoke()"] direction TB A1_TTE[TextTokenEvent] A1_TCE[ToolCallEvent] A1_TRE[ToolResultEvent] A1_CE[CompletionEvent] end subgraph AGENT2["Agent.invoke()"] direction TB A2_TTE[TextTokenEvent] A2_TCE[ToolCallEvent] A2_TRE[ToolResultEvent] A2_CE[CompletionEvent] end CSE1 --> AGENT1 CSE2 --> AGENT2 AGENT1 --> CCE["yield CollaboratorCompleteEvent
(for each)"] AGENT2 --> CCE CCE --> LLM2["Supervisor LLM Call
(synthesize collaborator results)"] LLM2 --> STREAM[yield TextTokenEvent
streaming] STREAM --> FINAL[yield CompletionEvent
final] end style Supervisor fill:#e0f2fe,stroke:#0284c7,color:#1f2328 style LLM1 fill:#dbeafe,stroke:#0969da,color:#1f2328 style LLM2 fill:#dbeafe,stroke:#0969da,color:#1f2328 style DEL fill:#fef3c7,stroke:#bf8700,color:#1f2328 style MA fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style NR fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style AGENT1 fill:#f0fdf4,stroke:#1a7f37,color:#1f2328 style AGENT2 fill:#f0fdf4,stroke:#1a7f37,color:#1f2328 style CCE fill:#fef3c7,stroke:#bf8700,color:#1f2328 style FINAL fill:#dcfce7,stroke:#1a7f37,color:#1f2328
Supervisor Pattern
Supervisor extends Agent:
# bedsheet/supervisor.py
class Supervisor(Agent):
"""An agent that can coordinate other agents."""
def __init__(
self,
name: str,
instruction: str,
model_client: LLMClient,
collaborators: list[Agent], # Child agents
collaboration_mode: Literal["supervisor", "router"] = "supervisor",
**kwargs,
):
super().__init__(name=name, instruction=instruction, model_client=model_client, **kwargs)
# Store collaborators by name for lookup
self.collaborators = {agent.name: agent for agent in collaborators}
self.collaboration_mode = collaboration_mode
# Register built-in delegate tool
self._register_delegate_action()
Two collaboration modes:
| Mode | Behavior | Use Case |
|---|---|---|
supervisor |
Delegates, collects results, synthesizes | Complex analysis needing multiple perspectives |
router |
Picks one agent, hands off entirely | Simple routing to specialists |
The delegate tool:
def _register_delegate_action(self):
"""Register the built-in delegate action."""
delegate_group = ActionGroup(name="DelegateTools")
@delegate_group.action(
name="delegate",
description="Delegate tasks to collaborator agents",
parameters={
"type": "object",
"properties": {
"delegations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"agent_name": {"type": "string"},
"task": {"type": "string"},
},
"required": ["agent_name", "task"],
},
},
},
"required": ["delegations"],
},
)
async def delegate(delegations: list) -> str:
# This is a placeholder - actual delegation handled specially
return "Delegation handled"
self.add_action_group(delegate_group)
Parallel Delegation
How parallel delegation works:
# bedsheet/supervisor.py
async def _execute_single_delegation(
self,
agent_name: str,
task: str,
session_id: str,
stream: bool = False,
) -> AsyncIterator[Event]:
"""Execute one delegation and yield its events."""
collaborator = self.collaborators.get(agent_name)
if collaborator is None:
yield ErrorEvent(error=f"Unknown agent: {agent_name}")
return
yield CollaboratorStartEvent(agent_name=agent_name, task=task)
# Invoke the collaborator, wrapping all its events
result = ""
async for event in collaborator.invoke(
session_id=f"{session_id}:{agent_name}",
input_text=task,
stream=stream,
):
# Wrap every event from the collaborator
yield CollaboratorEvent(agent_name=agent_name, inner_event=event)
if isinstance(event, CompletionEvent):
result = event.response
yield CollaboratorCompleteEvent(agent_name=agent_name, response=result)
async def _handle_parallel_delegations(
self,
delegations: list[dict],
session_id: str,
stream: bool,
) -> list[tuple[str, list[Event]]]:
"""Execute multiple delegations in parallel."""
async def run_one(d: dict) -> tuple[str, list[Event]]:
events = []
async for event in self._execute_single_delegation(
d["agent_name"], d["task"], session_id, stream
):
events.append(event)
return d["agent_name"], events
# asyncio.gather runs all delegations concurrently
results = await asyncio.gather(*[run_one(d) for d in delegations])
return results
Streaming vs Non-Streaming
Non-streaming (original)
# Wait for complete response
response = await self._client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
system=system,
messages=messages,
)
return self._parse_response(response)
Streaming (new)
# Stream tokens as they arrive
async with self._client.messages.stream(
model=self.model,
max_tokens=self.max_tokens,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
yield text # Each token
final = await stream.get_final_message()
yield self._parse_response(final)
Consumption in Agent:
async def invoke(self, session_id, input_text, stream=False) -> AsyncIterator[Event]:
# ... setup ...
if stream and hasattr(self.model_client, 'chat_stream'):
# Streaming path
response = None
async for chunk in self.model_client.chat_stream(messages, system, tools):
if isinstance(chunk, str):
yield TextTokenEvent(token=chunk) # Emit each token
else:
response = chunk # Final LLMResponse
else:
# Non-streaming path
response = await self.model_client.chat(messages, system, tools)
# Continue with tool handling using response...
Tool Calling
- You provide tool definitions in the API request
- Claude responds with
tool_useblocks if it wants to call tools - You execute the tools and send results back
- Claude continues with more tool calls or a text response
# Request to Claude includes tools:
{
"tools": [
{
"name": "get_stock_data",
"description": "Get stock price and metrics",
"input_schema": {
"type": "object",
"properties": {
"symbol": {"type": "string"}
},
"required": ["symbol"]
}
}
]
}
# Claude's response when it wants to use tools:
{
"content": [
{
"type": "tool_use",
"id": "call_123",
"name": "get_stock_data",
"input": {"symbol": "NVDA"}
}
],
"stop_reason": "tool_use"
}
# You execute the tool and send result back:
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "call_123",
"content": "{\"symbol\": \"NVDA\", \"price\": 875.50}"
}
]
}
Bedsheet's tool execution loop:
# bedsheet/agent.py (simplified)
async def invoke(self, session_id, input_text, stream=False):
# Add user message to memory
await self.memory.add_message(session_id, Message(role="user", content=input_text))
for iteration in range(self.max_iterations):
messages = await self.memory.get_messages(session_id)
tools = self._get_tool_definitions()
# Call LLM
response = await self.model_client.chat(messages, system_prompt, tools)
if response.text and not response.tool_calls:
# Final text response - we're done
yield CompletionEvent(response=response.text)
return
if response.tool_calls:
# Execute all tool calls in parallel
for tc in response.tool_calls:
yield ToolCallEvent(tool_name=tc.name, tool_input=tc.input, call_id=tc.id)
results = await asyncio.gather(*[
self._execute_tool(tc) for tc in response.tool_calls
])
for result in results:
yield ToolResultEvent(call_id=result.call_id, result=result.result)
# Add results to memory and loop back for next LLM call
await self._add_tool_results_to_memory(session_id, results)
Structured Outputs
A mechanism to guarantee that LLM responses conform to a specific JSON schema. Uses Anthropic's native constrained decoding - the model literally cannot generate tokens that would violate your schema.
When you need machine-readable data, not prose. API responses, database records, UI components - anything that must be parsed reliably.
The OutputSchema Class
A simple dataclass that wraps a JSON schema with optional Pydantic model reference:
# bedsheet/llm/base.py
from dataclasses import dataclass, field
from typing import Any
@dataclass
class OutputSchema:
"""Schema for structured output.
Can be initialized with a Pydantic model or a JSON schema dict.
"""
schema: dict[str, Any]
_pydantic_model: Any = field(default=None, repr=False)
@classmethod
def from_pydantic(cls, model: Any) -> "OutputSchema":
"""Create from a Pydantic BaseModel class."""
schema = model.model_json_schema()
return cls(schema=schema, _pydantic_model=model)
@classmethod
def from_dict(cls, schema: dict[str, Any]) -> "OutputSchema":
"""Create from a JSON schema dict."""
return cls(schema=schema)
How It Works with Anthropic's API
When an output schema is provided, the client uses Anthropic's beta structured outputs API:
# bedsheet/llm/anthropic.py
STRUCTURED_OUTPUTS_BETA = "structured-outputs-2025-11-13"
async def chat(self, messages, system, tools=None, output_schema=None):
kwargs = {
"model": self.model,
"max_tokens": self.max_tokens,
"system": system,
"messages": messages,
}
if tools:
kwargs["tools"] = tools
if output_schema:
# Use beta endpoint with structured outputs
kwargs["betas"] = [STRUCTURED_OUTPUTS_BETA]
kwargs["output_format"] = {
"type": "json_schema",
"schema": output_schema.schema,
}
# Use beta client for structured outputs
response = await self._client.beta.messages.create(**kwargs)
else:
# Standard API call
response = await self._client.messages.create(**kwargs)
return self._parse_response(response, output_schema)
Response Parsing with Structured Outputs
The response text is parsed as JSON and stored in parsed_output:
def _parse_response(self, response, output_schema=None) -> LLMResponse:
text = None
tool_calls = []
parsed_output = None
for block in response.content:
if block.type == "text":
text = block.text
# Parse JSON if structured output was requested
if output_schema and text:
parsed_output = json.loads(text)
elif block.type == "tool_use":
tool_calls.append(ToolCall(
id=block.id,
name=block.name,
input=block.input
))
return LLMResponse(
text=text,
tool_calls=tool_calls,
stop_reason=response.stop_reason,
parsed_output=parsed_output, # Validated JSON data
)
Usage Patterns
Raw JSON Schema
# No external dependencies
schema = OutputSchema.from_dict({
"type": "object",
"properties": {
"name": {"type": "string"},
"score": {"type": "number"}
},
"required": ["name", "score"]
})
response = await client.chat(
messages=[...],
system="...",
output_schema=schema,
)
print(response.parsed_output)
# {"name": "test", "score": 0.95}
Pydantic Model
# If using Pydantic in your project
from pydantic import BaseModel
class Result(BaseModel):
name: str
score: float
schema = OutputSchema.from_pydantic(Result)
response = await client.chat(
messages=[...],
system="...",
output_schema=schema,
)
print(response.parsed_output)
# {"name": "test", "score": 0.95}
Key Advantages
| Feature | Bedsheet | Other Frameworks |
|---|---|---|
| Works with tools | Yes - tools and schema together | Often mutually exclusive |
| Pydantic required | No - optional | Often mandatory |
| 100% schema compliance | Yes - constrained decoding | Varies (some use post-validation) |
| Native API integration | Yes - Anthropic beta | Varies |
The MockLLMClient supports parsed_output in MockResponse, making it easy to test agents that use structured outputs without calling the real API.
MockLLMClient
Test agents without making real API calls.
# bedsheet/testing.py
@dataclass
class MockResponse:
"""A pre-programmed response from the mock LLM."""
text: str | None = None
tool_calls: list[ToolCall] | None = None
class MockLLMClient:
"""Mock LLM client for testing."""
def __init__(self, responses: list[MockResponse]):
self.responses = list(responses)
self.call_count = 0
def _get_next_response(self) -> MockResponse:
"""Get and remove the next response from the queue."""
if not self.responses:
raise RuntimeError("MockLLMClient exhausted - no more responses")
self.call_count += 1
return self.responses.pop(0)
async def chat(self, messages, system, tools=None) -> LLMResponse:
"""Return the next pre-programmed response."""
response = self._get_next_response()
return LLMResponse(
text=response.text,
tool_calls=response.tool_calls or [],
stop_reason="end_turn" if response.text else "tool_use",
)
async def chat_stream(self, messages, system, tools=None) -> AsyncIterator[str | LLMResponse]:
"""Stream the next pre-programmed response."""
response = self._get_next_response()
# Yield text word by word
if response.text:
words = response.text.split(' ')
for i, word in enumerate(words):
if i > 0:
yield ' '
yield word
# Yield final response
yield LLMResponse(
text=response.text,
tool_calls=response.tool_calls or [],
stop_reason="end_turn",
)
Async Test Fixtures
pytest-asyncio setup:
# tests/conftest.py or in test file
import pytest
# Mark all tests in file as async
pytestmark = pytest.mark.asyncio
# Or mark individual tests
@pytest.mark.asyncio
async def test_something():
result = await some_async_function()
assert result == expected
Usage in tests:
# tests/test_agent.py
@pytest.mark.asyncio
async def test_agent_calls_tool_and_returns_result():
mock = MockLLMClient(responses=[
# First response: LLM wants to call a tool
MockResponse(tool_calls=[
ToolCall(id="1", name="get_weather", input={"city": "NYC"})
]),
# Second response: LLM synthesizes result
MockResponse(text="The weather in NYC is sunny."),
])
tools = ActionGroup(name="Weather")
@tools.action(name="get_weather", description="Get weather")
async def get_weather(city: str) -> str:
return f"Sunny in {city}"
agent = Agent(
name="WeatherBot",
instruction="Help with weather",
model_client=mock,
)
agent.add_action_group(tools)
events = []
async for event in agent.invoke("test", "What's the weather in NYC?"):
events.append(event)
# Verify event sequence
assert isinstance(events[0], ToolCallEvent)
assert events[0].tool_name == "get_weather"
assert isinstance(events[1], ToolResultEvent)
assert "Sunny" in events[1].result
assert isinstance(events[2], CompletionEvent)
assert "sunny" in events[2].response.lower()
Summary
Key Patterns Recap
| Pattern | Where Used | Why |
|---|---|---|
| Protocol | LLMClient, Memory |
Loose coupling, easy to swap implementations |
| Dataclass | All events, ToolCall, LLMResponse |
Clean data structures with less boilerplate |
| AsyncIterator | invoke() methods |
Stream events as they happen |
| asyncio.gather | Tool execution, parallel delegation | Concurrent I/O operations |
| Decorator | @action |
Register functions with metadata |
| Type hints | Everywhere | Self-documenting, IDE support, type checking |
File Reference
| File | Purpose |
|---|---|
| agent.py | Single agent with ReAct loop |
| supervisor.py | Multi-agent coordinator |
| action_group.py | Tool definitions and @action decorator |
| events.py | All event dataclasses |
| llm/base.py | LLMClient protocol and types |
| llm/anthropic.py | Claude integration |
| memory/base.py | Memory protocol |
| memory/in_memory.py | Dict-based memory |
| testing.py | MockLLMClient for tests |