Multi-Agent Collaboration Guide
Build a multi-agent assistant using Bedsheet's Supervisor pattern with parallel delegation and rich event streaming.
Introduction
This guide walks through building a multi-agent assistant using Bedsheet's Supervisor pattern. We'll create an Investment Research Assistant that demonstrates Bedsheet's key features:
Parallel Delegation
Multiple agents working simultaneously to reduce response time
Rich Event Streaming
Full visibility into every step of execution
Supervisor Synthesis
Intelligent combination of multiple agent outputs
Error Recovery
Graceful handling of agent failures
The Example: Investment Research Assistant
We'll build an assistant that helps with investment research by coordinating three specialized agents:
- EthicsChecker - Validates requests before processing
- MarketAnalyst - Analyzes market data and trends
- NewsResearcher - Gathers recent news and sentiment
The supervisor delegates to Market and News agents in parallel, dramatically reducing response time.
Architecture
Supervisor] Supervisor --> Ethics[EthicsChecker] Ethics --> |approved| Parallel{Parallel Delegation} Parallel --> Market[MarketAnalyst] Parallel --> News[NewsResearcher] Market --> Synthesis[Synthesize] News --> Synthesis Synthesis --> Response[Final Response] style Supervisor fill:#dbeafe,stroke:#0969da,color:#1f2328 style Ethics fill:#fef3c7,stroke:#d97706,color:#1f2328 style Market fill:#dcfce7,stroke:#1a7f37,color:#1f2328 style News fill:#dcfce7,stroke:#1a7f37,color:#1f2328
pip install bedsheet[demo]
(adds yfinance for Yahoo Finance stock data and ddgs for DuckDuckGo news search)
1Define the Collaborator Agents
Ethics Checker Agent
import asyncio
from bedsheet import Agent, ActionGroup, Supervisor
from bedsheet.llm import AnthropicClient
from bedsheet.memory import InMemory
from bedsheet.events import (
ToolCallEvent, ToolResultEvent, CompletionEvent, ErrorEvent,
DelegationEvent, CollaboratorStartEvent, CollaboratorEvent,
CollaboratorCompleteEvent, RoutingEvent
)
# ============================================================
# Ethics Checker Agent
# ============================================================
ethics_tools = ActionGroup(name="EthicsTools")
@ethics_tools.action(
name="check_investment_request",
description="Check if an investment request is appropriate"
)
async def check_investment_request(request: str) -> dict:
"""Check for inappropriate investment advice requests."""
red_flags = ["insider", "manipulation", "pump and dump", "illegal"]
request_lower = request.lower()
for flag in red_flags:
if flag in request_lower:
return {
"approved": False,
"reason": f"Request may involve {flag}",
"recommendation": "We cannot provide advice on potentially illegal activities."
}
return {
"approved": True,
"reason": "Request is appropriate for analysis",
"recommendation": None
}
ethics_agent = Agent(
name="EthicsChecker",
instruction="""You review investment-related requests for ethical concerns.
Use the check_investment_request tool and provide a clear verdict.
Be strict about insider trading, market manipulation, and illegal schemes.""",
model_client=AnthropicClient(),
)
ethics_agent.add_action_group(ethics_tools)
Market Analyst Agent
# ============================================================
# Market Analyst Agent
# ============================================================
market_tools = ActionGroup(name="MarketTools")
@market_tools.action(
name="get_stock_data",
description="Get REAL current stock price and key metrics from Yahoo Finance"
)
async def get_stock_data(symbol: str) -> dict:
"""Fetch REAL stock data from Yahoo Finance."""
import yfinance as yf
ticker = yf.Ticker(symbol.upper())
info = ticker.info
fast = ticker.fast_info
current_price = fast.last_price
prev_close = fast.previous_close
change = ((current_price - prev_close) / prev_close) * 100
return {
"symbol": symbol.upper(),
"price": round(current_price, 2),
"change": f"{change:+.2f}%",
"pe_ratio": round(info.get("trailingPE", 0), 2) if info.get("trailingPE") else "N/A",
"market_cap": f"${fast.market_cap/1e12:.2f}T" if fast.market_cap >= 1e12 else f"${fast.market_cap/1e9:.2f}B",
"data_source": "Yahoo Finance (REAL DATA)",
}
@market_tools.action(
name="get_technical_analysis",
description="Get REAL technical indicators calculated from historical price data"
)
async def get_technical_analysis(symbol: str) -> dict:
"""Calculate REAL technical indicators from Yahoo Finance historical data."""
import yfinance as yf
ticker = yf.Ticker(symbol.upper())
hist = ticker.history(period="6mo")
close = hist["Close"]
# RSI (14-day)
delta = close.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rsi = 100 - (100 / (1 + gain / loss))
# Moving averages and trend
sma_20 = close.rolling(window=20).mean().iloc[-1]
sma_50 = close.rolling(window=50).mean().iloc[-1]
current_price = close.iloc[-1]
if current_price > sma_20 > sma_50:
trend = "strong uptrend"
elif current_price > sma_20:
trend = "uptrend"
elif current_price < sma_20 < sma_50:
trend = "strong downtrend"
else:
trend = "downtrend"
return {
"symbol": symbol.upper(),
"rsi_14": round(rsi.iloc[-1], 2),
"trend": trend,
"sma_20": round(sma_20, 2),
"sma_50": round(sma_50, 2),
"support_30d": round(hist.tail(30)["Low"].min(), 2),
"resistance_30d": round(hist.tail(30)["High"].max(), 2),
"data_source": "Calculated from Yahoo Finance historical data (REAL DATA)",
}
market_agent = Agent(
name="MarketAnalyst",
instruction="""You are a market analyst specializing in stock analysis.
Use get_stock_data to fetch REAL current stock data from Yahoo Finance.
Use get_technical_analysis to get REAL calculated technical indicators.
Provide clear, data-driven analysis with specific numbers.""",
model_client=AnthropicClient(),
)
market_agent.add_action_group(market_tools)
News Researcher Agent
# ============================================================
# News Researcher Agent
# ============================================================
news_tools = ActionGroup(name="NewsTools")
@news_tools.action(
name="search_news",
description="Search for REAL recent news about a company using DuckDuckGo"
)
async def search_news(query: str) -> dict:
"""Search REAL recent news using DuckDuckGo."""
from ddgs import DDGS
with DDGS() as ddgs:
results = list(ddgs.news(query, max_results=5))
articles = []
for r in results:
articles.append({
"headline": r.get("title", ""),
"source": r.get("source", ""),
"date": r.get("date", ""),
"body": r.get("body", "")[:150] + "..." if r.get("body") else "",
})
return {
"query": query,
"articles": articles,
"count": len(articles),
"data_source": "DuckDuckGo News (REAL DATA)",
}
@news_tools.action(
name="analyze_sentiment",
description="Analyze overall sentiment from news articles"
)
async def analyze_sentiment(articles: list[dict]) -> dict:
"""Analyze sentiment from news articles."""
if not articles:
return {"sentiment": "neutral", "confidence": 0.0}
sentiment_scores = {"positive": 1, "neutral": 0, "negative": -1}
total = sum(sentiment_scores.get(a.get("sentiment", "neutral"), 0) for a in articles)
avg = total / len(articles)
if avg > 0.3:
sentiment = "bullish"
elif avg < -0.3:
sentiment = "bearish"
else:
sentiment = "neutral"
return {
"sentiment": sentiment,
"score": avg,
"articles_analyzed": len(articles),
"confidence": min(abs(avg) + 0.5, 1.0)
}
news_agent = Agent(
name="NewsResearcher",
instruction="""You are a news researcher focused on financial news.
Use search_news to find REAL recent news articles via DuckDuckGo.
Use analyze_sentiment to analyze the overall sentiment.
Report key headlines with their sources.""",
model_client=AnthropicClient(),
)
news_agent.add_action_group(news_tools)
2Create the Supervisor
The supervisor coordinates all three agents:
# ============================================================
# Investment Advisor Supervisor
# ============================================================
advisor = Supervisor(
name="InvestmentAdvisor",
instruction="""You are an investment research advisor that coordinates
specialized analysts to provide comprehensive stock analysis.
## Your Workflow
1. **Ethics Check First**: ALWAYS delegate to EthicsChecker first
- If ethics check fails, stop and explain to the user
- If it passes, proceed to research
2. **Parallel Research**: Delegate to BOTH analysts simultaneously:
- MarketAnalyst: For price data and technical analysis
- NewsResearcher: For recent news and sentiment
Use parallel delegation like this:
delegate(delegations=[
{"agent_name": "MarketAnalyst", "task": "Analyze [SYMBOL] stock"},
{"agent_name": "NewsResearcher", "task": "Find news about [COMPANY]"}
])
3. **Synthesize**: Combine findings into a comprehensive analysis:
- Current price and key metrics
- Technical outlook
- News sentiment
- Overall assessment (without specific buy/sell advice)
## Important
- Always check ethics BEFORE doing research
- Use parallel delegation for Market and News to save time
- Provide balanced analysis, not financial advice""",
model_client=AnthropicClient(),
memory=InMemory(),
collaborators=[ethics_agent, market_agent, news_agent],
collaboration_mode="supervisor", # We want synthesis, not just routing
max_iterations=15,
)
collaboration_mode="supervisor" means the supervisor will synthesize responses from multiple agents. Use "router" if you just want to pick one agent and pass through its response directly.
3Rich Event Handling
This is where Bedsheet shines - full visibility into every step:
async def run_with_rich_events():
"""Demonstrate rich event streaming."""
session_id = "demo-session"
user_input = "Can you analyze NVIDIA stock for me?"
print("\n" + "=" * 70)
print("BEDSHEET MULTI-AGENT DEMO")
print("=" * 70)
print(f"\nUser: {user_input}\n")
print("-" * 70)
# Track timing for parallel execution demo
import time
start_time = time.time()
parallel_agents = []
async for event in advisor.invoke(session_id=session_id, input_text=user_input):
# --------------------------------------------------------
# Delegation Events - Shows coordination decisions
# --------------------------------------------------------
if isinstance(event, DelegationEvent):
agents = [d["agent_name"] for d in event.delegations]
if len(agents) > 1:
print(f"\n[PARALLEL DELEGATION] Dispatching {len(agents)} agents simultaneously:")
for d in event.delegations:
print(f" -> {d['agent_name']}: {d['task'][:60]}...")
parallel_agents = agents
else:
print(f"\n[DELEGATION] -> {agents[0]}")
# --------------------------------------------------------
# Collaborator Lifecycle Events
# --------------------------------------------------------
elif isinstance(event, CollaboratorStartEvent):
indicator = "!" if event.agent_name in parallel_agents else "->"
print(f"\n{indicator} [{event.agent_name}] Starting...")
elif isinstance(event, CollaboratorCompleteEvent):
elapsed = time.time() - start_time
print(f"[ok] [{event.agent_name}] Complete ({elapsed:.1f}s)")
# --------------------------------------------------------
# Inner Events - See inside each collaborator
# --------------------------------------------------------
elif isinstance(event, CollaboratorEvent):
inner = event.inner_event
agent = event.agent_name
indent = " "
if isinstance(inner, ToolCallEvent):
print(f"{indent}[tool] [{agent}] Calling: {inner.tool_name}({inner.tool_input})")
elif isinstance(inner, ToolResultEvent):
if inner.error:
print(f"{indent}[error] [{agent}] Error: {inner.error}")
else:
# Show truncated result
result_str = str(inner.result)
if len(result_str) > 80:
result_str = result_str[:80] + "..."
print(f"{indent}[ok] [{agent}] Result: {result_str}")
elif isinstance(inner, CompletionEvent):
# Collaborator's response (before supervisor synthesis)
preview = inner.response[:100] + "..." if len(inner.response) > 100 else inner.response
print(f"{indent}[response] [{agent}] Response: {preview}")
# --------------------------------------------------------
# Final Completion
# --------------------------------------------------------
elif isinstance(event, CompletionEvent):
total_time = time.time() - start_time
print("\n" + "-" * 70)
print(f"FINAL RESPONSE ({total_time:.1f}s total)")
print("-" * 70)
print(f"\n{event.response}")
# --------------------------------------------------------
# Error Handling
# --------------------------------------------------------
elif isinstance(event, ErrorEvent):
print(f"\n[WARNING] ERROR: {event.error}")
if event.recoverable:
print(" (Attempting recovery...)")
print("\n" + "=" * 70)
if __name__ == "__main__":
asyncio.run(run_with_rich_events())
Example Output
4Parallel Delegation
Notice how MarketAnalyst and NewsResearcher run simultaneously:
Both agents make tool calls at the same time, reducing total response time.
If each agent takes 2 seconds, sequential execution would take 4 seconds. Parallel execution completes in just 2 seconds - a 50% reduction in response time!
5Event Streaming
Every step is visible through Bedsheet's event system:
DelegationEvent
See when and why agents are dispatched
CollaboratorStartEvent
Know when each agent begins work
CollaboratorEvent
See every tool call and result inside agents
CollaboratorCompleteEvent
Know when each agent finishes
Events enable debugging ("why did it call that tool?"), building UIs with progress indicators, logging for observability, and understanding complex agent interactions.
6Error Handling
Bedsheet gracefully handles agent failures:
@market_tools.action(name="get_stock_data")
async def get_stock_data(symbol: str) -> dict:
if symbol.upper() == "INVALID":
raise ValueError("Invalid stock symbol")
# ...
What happens when an error occurs
- The collaborator's error becomes an
ErrorEvent - The supervisor receives the error as a tool result
- The supervisor can decide how to proceed (retry, use different agent, or explain to user)
7Structured Outputs for Multi-Agent
When building multi-agent systems, structured outputs become especially powerful. Instead of parsing free-form text from collaborators, you can guarantee each agent returns data in a predictable format.
Use Case: Portfolio Analysis Dashboard
Imagine a dashboard that displays analysis from multiple agents. Each agent returns structured JSON that your UI can render directly:
from bedsheet.llm import AnthropicClient, OutputSchema
# Define schemas for each agent's output
market_schema = OutputSchema.from_dict({
"type": "object",
"properties": {
"symbol": {"type": "string"},
"price": {"type": "number"},
"trend": {"type": "string", "enum": ["bullish", "bearish", "neutral"]},
"confidence": {"type": "number", "minimum": 0, "maximum": 1},
"key_levels": {
"type": "object",
"properties": {
"support": {"type": "number"},
"resistance": {"type": "number"}
}
}
},
"required": ["symbol", "price", "trend", "confidence"]
})
news_schema = OutputSchema.from_dict({
"type": "object",
"properties": {
"sentiment": {"type": "string", "enum": ["bullish", "bearish", "neutral"]},
"headline_count": {"type": "integer"},
"top_headlines": {
"type": "array",
"items": {"type": "string"}
},
"risk_factors": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["sentiment", "headline_count"]
})
Why This Matters for Multi-Agent
Reliable Synthesis
The supervisor can programmatically combine structured data from multiple agents
UI-Ready Output
Frontend can render agent results without parsing markdown
Type Safety
Your code knows exactly what shape the data will be
Error Prevention
No more "agent returned unexpected format" bugs
# Supervisor can now work with predictable data
market_data = market_response.parsed_output # Guaranteed structure
news_data = news_response.parsed_output # Guaranteed structure
# Combine programmatically
combined_score = (
market_data["confidence"] * 0.6 +
(1.0 if news_data["sentiment"] == "bullish" else 0.5) * 0.4
)
Define a schema that matches each agent's specialty: MarketAnalyst returns price/technicals/trend, NewsResearcher returns sentiment/headlines/risk factors. This turns your multi-agent system from a "chat with experts" into a structured data pipeline that happens to use LLMs.
8Best Practices
- Always check prerequisites first (ethics, auth, etc.) before expensive operations
- Use parallel delegation for independent tasks to reduce latency
- Handle events appropriately for your UI (CLI, web, etc.)
- Keep collaborator instructions focused - each agent should have a clear specialty
- Let the supervisor synthesize - don't just concatenate agent outputs
Next Steps
- Add more specialized agents (RiskAnalyst, CompetitorTracker, etc.)
- Implement real tool integrations (market data APIs, news APIs)
- Add memory persistence with RedisMemory for conversation continuity
- Build a web UI that streams events in real-time
- Use structured outputs for dashboard-ready agent responses