The Sixth Sense: Design & Implementation
A deep dive into the architecture, design choices, and PubNub integration behind Bedsheet's distributed agent communication module.
1 Motivation
Before the Sixth Sense, Bedsheet agents were limited to single-process execution. A Supervisor could coordinate multiple agents, but they all lived in the same Python process. This created three problems:
- Scalability: A single process can only run so many agents before resource contention kills performance. CPU-intensive agents block the event loop for others.
- Isolation: One crashing agent can take down the whole system. No way to restart a single agent without restarting everything.
- Distribution: No way to run agents across machines or cloud providers. A monitoring agent on AWS can't talk to an analyzer on GCP.
The Sixth Sense solves this by giving agents distributed communication over PubNub. Each agent runs in its own process (or container, or cloud function) and communicates through publish/subscribe messaging. They find each other by subscribing to shared channels.
Design Goal
Add distributed communication without changing how single agents work. A regular Agent stays exactly the same. You opt in to networking by adding SenseMixin to your class hierarchy — zero changes to existing code.
2 Architecture Overview
The Sixth Sense is organized as a layered module inside bedsheet/sense/:
Each layer has a clear responsibility:
signals.py
The Signal dataclass — the atomic unit of inter-agent communication. Seven signal kinds cover all interaction patterns.
serialization.py
Compact JSON encoding with short keys (k, s, p) to stay under PubNub's 32KB message limit. Auto-truncates large payloads.
protocol.py
The SenseTransport Protocol defines the transport contract. Any class implementing these 7 methods can serve as a transport.
mixin.py
SenseMixin adds network capabilities to any Agent via Python's multiple inheritance. This is the primary integration point.
Dependencies flow inward: mixin.py depends on protocol.py (not on pubnub_transport.py). The PubNub transport is the outermost layer and is completely swappable. You could write a Redis, NATS, or MQTT transport without touching any other file.
3 Signal Design
The Signal Dataclass
@dataclass
class Signal:
kind: SignalKind # What type of signal
sender: str # Who sent it (agent name)
payload: dict[str, Any] # Arbitrary data
correlation_id: str # Links requests to responses
target: str | None # Intended recipient (None = broadcast)
timestamp: float # Unix timestamp (auto-set)
source_channel: str | None # Which channel it arrived on
Why a Dataclass, Not a Pydantic Model?
Bedsheet uses @dataclass for all data structures (events, signals, messages). Pydantic adds validation overhead that's unnecessary for internal data structures where the framework controls construction. Dataclasses give us type hints, __eq__, and __repr__ for free with zero runtime cost. The same reasoning applies to the existing Event types in bedsheet/events.py.
The Seven Signal Kinds
We use a Literal type (not an Enum) for signal kinds. This gives us type-checking without the overhead of enum instances:
SignalKind = Literal[
"request", # Ask another agent to do something
"response", # Reply to a request
"alert", # Broadcast an alert to all listeners
"heartbeat", # Periodic "I'm alive" signal
"claim", # Attempt to claim ownership of an incident
"release", # Release a claimed incident
"event", # General-purpose notification
]
| Kind | Direction | Purpose |
|---|---|---|
request | Targeted (one agent) | Delegate a task to a specific agent via request() |
response | Targeted (requester) | Return the result of a request, matched by correlation_id |
alert | Broadcast (all) | Notify all subscribers of a condition (CPU high, breach detected) |
heartbeat | Broadcast (all) | Periodic signal with agent capabilities for presence detection |
claim | Broadcast (all) | Attempt to own an incident (for conflict-free coordination) |
release | Broadcast (all) | Release ownership so another agent can claim it |
event | Broadcast or targeted | General-purpose notification for extensibility |
Why Exactly Seven Kinds?
request + response cover RPC-style delegation. alert + event cover pub/sub notifications. claim + release cover distributed coordination. heartbeat covers presence. This set is minimal but sufficient — every agent interaction pattern we've encountered maps to one of these seven. We chose not to make it extensible (e.g., custom kinds) because a fixed set enables optimized routing in the signal loop.
Correlation IDs
Every signal gets a correlation_id — a 12-character hex string from uuid4(). This is how request/response pairs are matched:
# Commander sends a request with correlation_id="a1b2c3d4e5f6"
signal = Signal(kind="request", sender="commander",
payload={"task": "check CPU"}, target="cpu-watcher")
# cpu-watcher responds with the SAME correlation_id
response = Signal(kind="response", sender="cpu-watcher",
payload={"result": "CPU at 45%"},
correlation_id="a1b2c3d4e5f6", # same ID
target="commander")
The mixin's signal loop checks self._pending_requests[signal.correlation_id] to resolve the correct asyncio.Future. This means an agent can have multiple concurrent requests in flight without confusion.
4 Transport Protocol
@runtime_checkable
class SenseTransport(Protocol):
async def connect(self, agent_id: str, namespace: str) -> None: ...
async def disconnect(self) -> None: ...
async def broadcast(self, channel: str, signal: Signal) -> None: ...
async def subscribe(self, channel: str) -> None: ...
async def unsubscribe(self, channel: str) -> None: ...
def signals(self) -> AsyncIterator[Signal]: ...
async def get_online_agents(self, channel: str) -> list[AgentPresence]: ...
Why Protocol, Not ABC?
Bedsheet uses Protocol (structural subtyping) throughout the codebase — LLMClient, Memory, and now SenseTransport. The philosophy: if it walks like a duck and quacks like a duck, it's a duck. A class satisfies SenseTransport by implementing the right methods, without explicitly inheriting from it. This means you can write a transport in a separate package that doesn't import bedsheet at all — it just needs the right method signatures. The @runtime_checkable decorator lets us use isinstance() checks when needed.
The Seven Methods
The protocol is intentionally minimal. Every method maps to a fundamental pub/sub operation:
| Method | What It Does | PubNub Equivalent |
|---|---|---|
connect() | Establish connection with identity | Create PubNubAsyncio instance |
disconnect() | Clean up and close | pubnub.stop() |
broadcast() | Publish a signal to a channel | pubnub.publish() |
subscribe() | Listen to a channel | pubnub.subscribe() |
unsubscribe() | Stop listening | pubnub.unsubscribe() |
signals() | Async iterator of incoming signals | Read from asyncio.Queue |
get_online_agents() | Who's on a channel right now? | pubnub.here_now() |
AgentPresence
AgentPresence is a simple dataclass representing a remote agent's identity on the network:
@dataclass
class AgentPresence:
agent_id: str
agent_name: str
namespace: str
capabilities: list[str] = field(default_factory=list)
status: str = "online"
metadata: dict[str, Any] = field(default_factory=dict)
This is returned by get_online_agents() and populated from PubNub's here_now() API (or from heartbeat signals in the mock transport).
5 Why PubNub
We evaluated several messaging backends before choosing PubNub:
| Option | Pros | Cons |
|---|---|---|
| PubNub | Zero infrastructure, built-in presence, global CDN, generous free tier | 32KB message limit, vendor lock-in |
| Redis Pub/Sub | Fast, familiar, no message limits | Requires Redis server, no built-in presence, no cross-cloud |
| NATS | Very fast, cloud-native | Requires NATS server, complex JetStream setup for persistence |
| WebSocket server | Full control, no vendor | Must build everything: routing, presence, reconnection, scaling |
| AWS SNS/SQS | Native AWS, highly reliable | AWS-only, complex setup, not real-time (polling) |
The Decision
PubNub won because of zero infrastructure. A demo user signs up, gets keys, and has global real-time messaging in 5 minutes. No Docker containers, no cloud setup, no servers to manage. The 32KB limit is handled by compact serialization (short keys, auto-truncation). The vendor lock-in is mitigated by the SenseTransport Protocol — you can swap to Redis or NATS by implementing 7 methods.
PubNub's Key Features We Use
- Publish/Subscribe: Core messaging. Each channel is a topic agents subscribe to.
- Presence: PubNub tracks who's on each channel via
here_now(). We get agent discovery for free. - UUID identity: Each PubNub connection has a UUID. We set this to the agent's name for presence identification.
- Auto-reconnection:
PNReconnectionPolicy.EXPONENTIALhandles network flaps automatically. - Global CDN: Messages route through PubNub's edge network, so agents in different regions get low latency.
6 The Thread-to-Asyncio Bridge
This is the trickiest piece of the PubNub integration. PubNub's Python SDK uses threaded callbacks for incoming messages, but Bedsheet is fully async. We need to safely cross the thread boundary.
The Problem
# PubNub calls this from a BACKGROUND THREAD:
class _SignalListener(SubscribeCallback):
def message(self, pubnub, message):
# We're on PubNub's thread, NOT the asyncio event loop!
# Cannot await anything, cannot use asyncio directly
signal = deserialize(message.message)
# How do we get this signal to the async signal_loop?
The Solution: call_soon_threadsafe
class _SignalListener(SubscribeCallback):
def __init__(self, queue: asyncio.Queue[Signal], loop: asyncio.AbstractEventLoop):
self._queue = queue
self._loop = loop
def message(self, pubnub, message):
signal = deserialize(message.message, source_channel=message.channel)
# Thread-safe way to put into the asyncio queue:
self._loop.call_soon_threadsafe(self._queue.put_nowait, signal)
What's Happening Here
- We capture the asyncio event loop reference at connection time (
asyncio.get_running_loop()) - We pass the loop and an
asyncio.Queueto the PubNub callback listener - When PubNub's thread delivers a message, we call
loop.call_soon_threadsafe() - This schedules
queue.put_nowait(signal)to run on the event loop's thread - The mixin's
_signal_loop()awaitsqueue.get()on the async side
call_soon_threadsafe is the standard Python mechanism for thread-to-asyncio communication. It's safe to call from any thread and guarantees the callback runs on the event loop's thread during the next iteration.
asyncio.run_coroutine_threadsafe?We use call_soon_threadsafe with put_nowait instead of run_coroutine_threadsafe with put because put_nowait is synchronous and never blocks. The queue has no max size, so it won't raise QueueFull. This is simpler and avoids creating unnecessary futures on the PubNub callback thread.
7 Channel Naming Convention
PubNub channels are just strings. We use a namespaced convention to prevent collisions:
def _full_channel(self, channel: str) -> str:
"""Expand short channel name to full namespaced channel."""
if channel.startswith("bedsheet."):
return channel
return f"bedsheet.{self._namespace}.{channel}"
# Examples:
# "alerts" → "bedsheet.cloud-ops.alerts"
# "tasks" → "bedsheet.cloud-ops.tasks"
# "agent-1" → "bedsheet.cloud-ops.agent-1" (direct channel)
Every agent also subscribes to its own direct channel (bedsheet.{namespace}.{agent_name}). This enables targeted messaging — when you call send_to("cpu-watcher", signal), it publishes to bedsheet.cloud-ops.cpu-watcher.
Why Namespace Channels?
Namespacing prevents cross-contamination between different deployments sharing the same PubNub keys. A staging environment using namespace "staging" won't interfere with production using "prod", even on the same PubNub app.
8 The Mixin Pattern
SenseMixin is the heart of the Sixth Sense. It adds distributed capabilities to any Agent without modifying the Agent class itself:
class SenseMixin:
"""Mixin that adds distributed sensing to any Agent."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) # Calls Agent.__init__()
self._transport: SenseTransport | None = None
self._namespace: str = ""
self._signal_handlers: dict[SignalKind, list[SignalHandler]] = {}
self._signal_task: asyncio.Task | None = None
self._pending_requests: dict[str, asyncio.Future[Signal]] = {}
self._claimed_incidents: set[str] = set()
self._heartbeat_task: asyncio.Task | None = None
How Python MRO Makes This Work
class MyAgent(SenseMixin, Agent):
pass
# Method Resolution Order (MRO):
# MyAgent → SenseMixin → Agent → object
# When you call MyAgent(name="x", instruction="y", model_client=client):
# 1. MyAgent.__init__ → not defined, goes to next
# 2. SenseMixin.__init__(self, name="x", ...)
# → super().__init__(name="x", ...) → Agent.__init__
# 3. Agent.__init__ sets up name, instruction, model_client
# 4. Back in SenseMixin.__init__, adds _transport, _signal_handlers, etc.
Why a Mixin, Not Inheritance or Composition?
Option A: Inheritance (class SenseAgent(Agent)) — Forces users to subclass a specific class. Can't add sensing to a Supervisor, or to user's custom Agent subclass.
Option B: Composition (agent.sense = SenseAdapter(agent)) — Cleaner separation, but awkward API. Users would write agent.sense.broadcast() instead of agent.broadcast(). The adapter would need to reach into the agent's internals for invoke().
Option C: Mixin (class MyAgent(SenseMixin, Agent)) — Best of both worlds. The agent IS-A SenseMixin and IS-A Agent. broadcast(), request(), on_signal() feel like native agent methods. Works with both Agent and Supervisor.
The type: ignore[attr-defined] Pattern
You'll notice # type: ignore[attr-defined] comments throughout the mixin. This is because SenseMixin accesses self.name and self.invoke(), which are defined on Agent, not on the mixin itself:
await transport.connect(self.name, namespace) # type: ignore[attr-defined]
Mypy can't know that SenseMixin will always be combined with Agent. The type ignores acknowledge this limitation. An alternative would be a generic Protocol for the host class, but that adds complexity for no runtime benefit.
9 Request/Response Over PubNub
The most complex pattern is request/response — asking a remote agent to do something and waiting for the answer. This implements an RPC-like pattern over pub/sub messaging.
The Flow
Commander requests CPU check from cpu-watcher
- Commander calls
request("cpu-watcher", "check CPU") - Creates a
correlation_idand anasyncio.Future - Stores the future in
self._pending_requests[correlation_id] - Sends a
requestsignal to cpu-watcher's direct channel - Awaits the future with a timeout
- cpu-watcher's signal loop receives the request
- Calls
_handle_request()which runsself.invoke(session_id, task) - The agent's LLM processes the task, calls tools, generates a response
- Sends a
responsesignal back with the samecorrelation_id
- Commander's signal loop receives the response
- Matches
correlation_idto the pending future - Resolves the future with the response signal
- Commander's
request()returns the result string
Key Implementation Detail: _handle_request
async def _handle_request(self, signal: Signal) -> None:
task = signal.payload.get("task", "")
session_id = f"sense-{signal.correlation_id}"
# Run the full agent loop (LLM + tools)
result = ""
async for event in self.invoke(session_id, task):
if isinstance(event, CompletionEvent):
result = event.response
# Send response back to requester
response_signal = Signal(
kind="response",
sender=self.name,
payload={"result": result},
correlation_id=signal.correlation_id,
target=signal.sender,
)
await self.send_to(signal.sender, response_signal)
This is where the Sixth Sense connects to Bedsheet's core: the request handler calls self.invoke(), which runs the agent's full ReAct loop (LLM reasoning → tool calls → final response). The remote agent isn't just a message router — it's a full AI agent that thinks about the request and uses its tools to answer it.
Requests are handled with asyncio.create_task(self._handle_request(signal)), so multiple requests can be processed concurrently. An agent can handle several incoming requests while also sending its own requests to others.
10 Claim Protocol
When multiple agents see the same alert, we need exactly one to handle it. The claim protocol provides leaderless conflict resolution — no central coordinator required.
How It Works
async def claim_incident(self, incident_id: str, channel: str) -> bool:
# 1. Broadcast our claim
signal = Signal(kind="claim", sender=self.name,
payload={"incident_id": incident_id})
await self.broadcast(channel, signal)
# 2. Wait 500ms for competing claims
await asyncio.sleep(0.5)
# 3. If still in our claimed set, we won
return incident_id in self._claimed_incidents
The conflict resolution happens in _handle_claim():
def _handle_claim(self, signal: Signal) -> None:
incident_id = signal.payload.get("incident_id")
if incident_id in self._claimed_incidents:
# Tie-breaking: lower sender name wins
if signal.sender < self.name:
self._claimed_incidents.discard(incident_id)
Why Not a Distributed Lock?
A proper distributed lock (Redlock, ZooKeeper, etcd) would guarantee exactly-once processing. But it would also require additional infrastructure, add latency, and create failure modes. The claim protocol is probabilistic but practical: the 500ms window and deterministic tie-breaking (alphabetical agent name) give us conflict-free coordination 99%+ of the time. For a monitoring system where occasional duplicate handling is harmless, this is the right trade-off.
11 Testing Strategy
The MockSenseTransport
The key testing challenge: how to test distributed agent communication without PubNub? The answer is MockSenseTransport, which follows the same pattern as MockLLMClient.
The Hub Pattern
class _MockSenseHub:
"""Shared state for mock transports. Routes signals between agents."""
def __init__(self):
self.queues: dict[str, asyncio.Queue[Signal]] = {}
self.subscriptions: dict[str, set[str]] = {}
self.presences: dict[str, AgentPresence] = {}
class MockSenseTransport:
def __init__(self, hub: _MockSenseHub | None = None):
self.hub = hub or _MockSenseHub()
def create_peer(self) -> "MockSenseTransport":
"""Create a sibling transport sharing the same hub."""
return MockSenseTransport(self.hub)
Why the Hub Pattern?
The first implementation used a single transport for all agents. This failed because connect(agent_id) overwrites the agent's identity — the second agent's connect would overwrite the first agent's ID. The hub pattern gives each agent its own transport instance (with its own identity and queue) while sharing the routing infrastructure. create_peer() returns a new transport connected to the same hub.
Test Design Lessons
The signal loop is a background task that continuously reads from the transport's queue. If a test tries to read from the queue directly (e.g., await queue.get()), it races with the signal loop. Use on_signal handlers in tests instead, and await asyncio.sleep() to give the loop time to process.
Test coverage (31 tests in tests/test_sense.py):
| Test Group | Count | What's Covered |
|---|---|---|
| Signal creation | 3 | Dataclass defaults, payload, all 7 kinds |
| Serialization | 5 | Compact encoding, roundtrip, truncation, minimal deserialization |
| Protocol | 2 | Mock satisfies protocol, AgentPresence creation |
| MockSenseTransport | 4 | Connect/disconnect, subscribe/broadcast, online agents, create_peer |
| SenseMixin | 7 | Join/leave, broadcast, request/response, timeout, handlers, self-skip, targeting |
| Claim protocol | 2 | Claim win, release |
| SenseNetwork | 3 | Add agent, reject non-sense agent, stop disconnects all |
| Events | 5 | All 5 new event dataclasses |
12 Trade-offs & Future Work
What We Chose
| Decision | Benefit | Cost |
|---|---|---|
| PubNub over self-hosted | Zero infrastructure | 32KB limit, vendor dependency |
| Mixin over composition | Natural API (agent.request()) | type: ignore comments |
| Protocol over ABC | Structural subtyping | No enforced implementation |
| Probabilistic claims | No infrastructure needed | Rare duplicate handling |
| Short-key serialization | Stays under 32KB | Less readable wire format |
| Hub pattern for testing | True multi-agent tests | More complex mock |
Known Limitations
- No message persistence: PubNub messages are ephemeral (unless you enable PubNub Storage, a paid feature). If an agent is offline when a signal is sent, it misses it.
- No message ordering guarantee: PubNub guarantees per-channel ordering, but cross-channel ordering is not guaranteed.
- Claim protocol is probabilistic: Under very high load or network partitions, two agents might both win a claim. Acceptable for monitoring; not suitable for financial transactions.
- 32KB message limit: The auto-truncation handles this, but very large payloads lose data. If you need to send large data, send a reference (URL, S3 key) instead.
Future Directions
- Redis transport: For teams with existing Redis infrastructure
- NATS transport: For high-throughput scenarios
- Message persistence: Store signals for replay and audit trails
- Encryption: End-to-end encryption for signal payloads
- Agent discovery: Automatic capability-based routing (find the agent that can handle this request)