The Sixth Sense
Distributed communication for Bedsheet agents. Run agents across processes, machines, and cloud providers — they find each other and collaborate over PubNub.
1 Overview
Bedsheet's Supervisor pattern works great when all agents run in a single Python process. But what if your agents need to run on different machines, behind different firewalls, or even on different cloud providers?
The Sixth Sense module adds distributed communication to any Bedsheet agent via PubNub's real-time messaging platform. Agents become network-aware peers that can broadcast alerts, send requests to each other, and coordinate incident responses — all without running HTTP servers or managing service discovery.
Pure Clients
Agents are PubNub clients, not servers. No ports to open, no URLs to register. Works behind NATs and firewalls.
Signal-Based
Seven signal kinds (request, response, alert, heartbeat, claim, release, event) cover all coordination patterns.
Swappable Transport
SenseTransport protocol means PubNub is one implementation. Swap in MQTT, Redis Streams, or your own.
Leaderless Coordination
Claim-based protocol lets multiple commanders compete for incident ownership. No central coordinator needed.
2 Setup
Install
pip install bedsheet[sense]
This adds the pubnub package as a dependency.
Get PubNub Keys
- Sign up at pubnub.com (free tier: 200 MAU, unlimited channels)
- Create an app in the PubNub dashboard
- Copy your Subscribe Key and Publish Key
export PUBNUB_SUBSCRIBE_KEY=sub-c-your-key-here
export PUBNUB_PUBLISH_KEY=pub-c-your-key-here
Enable Presence in your PubNub keyset settings to use get_online_agents().
3 Your First Sense Agent
Any Bedsheet Agent becomes network-aware by adding the SenseMixin:
from bedsheet import Agent, ActionGroup, SenseMixin
from bedsheet.llm.anthropic import AnthropicClient
from bedsheet.sense.pubnub_transport import PubNubTransport
# 1. Create a sense-aware agent class
class MyAgent(SenseMixin, Agent):
pass
# 2. Build the agent as usual
agent = MyAgent(
name="my-agent",
instruction="You are a helpful agent.",
model_client=AnthropicClient(),
)
# 3. Connect to the network
transport = PubNubTransport(
subscribe_key="sub-c-...",
publish_key="pub-c-...",
)
await agent.join_network(transport, "my-namespace", ["alerts", "tasks"])
# 4. The agent can now send and receive signals!
# ... do work ...
# 5. Disconnect when done
await agent.leave_network()
What's happening?
SenseMixinadds network methods to the Agent without changing the core ReAct loopjoin_network()connects to PubNub, subscribes to channels, and starts a background signal processing loop- The agent automatically subscribes to its own direct channel (named after the agent)
- A heartbeat broadcasts the agent's capabilities every 30 seconds
4 Signals
A Signal is the unit of inter-agent communication. Every signal has a kind, a sender, and an optional payload:
from bedsheet.sense import Signal
# Broadcast an alert
alert = Signal(
kind="alert",
sender="cpu-watcher",
payload={"severity": "high", "cpu": 95.2},
)
await agent.broadcast("alerts", alert)
Signal Kinds
| Kind | Purpose | Pattern |
|---|---|---|
request | Ask an agent to do work | Peer-to-peer, expects response |
response | Return results | Peer-to-peer, has correlation_id |
alert | Broadcast an observation | One-to-many |
heartbeat | Liveness + capabilities | One-to-many (periodic) |
claim | Claim an incident | One-to-many (conflict resolution) |
release | Release a claimed incident | One-to-many |
event | Serialized bedsheet Event | One-to-many (observability) |
Channel Naming
Channels follow the convention bedsheet.<namespace>.<purpose>:
bedsheet.cloud-ops.alerts # Alert broadcasts
bedsheet.cloud-ops.tasks # Task coordination / claims
bedsheet.cloud-ops.cpu-watcher # Direct channel to cpu-watcher agent
Custom Signal Handlers
@agent.on_signal("alert")
async def handle_alert(signal: Signal):
severity = signal.payload.get("severity")
print(f"Alert from {signal.sender}: {severity}")
5 Request / Response
The request() method sends a task to a remote agent and waits for the response. Under the hood, the receiving agent runs invoke() with the task and sends back the completion:
# Commander asks cpu-watcher to check usage
result = await commander.request(
"cpu-watcher",
"What is the current CPU usage?",
timeout=30.0,
)
print(result) # "Overall: 45.2%, Per-core: [32.1, 58.3, ...]"
What's happening?
- Commander creates a
requestsignal with a uniquecorrelation_id - Signal is published to the cpu-watcher's direct channel
- cpu-watcher's signal loop receives the request
- cpu-watcher calls
self.invoke()with the task text - The LLM uses cpu-watcher's tools to gather data
- The
CompletionEventresponse is sent back as aresponsesignal - Commander's future resolves with the result
If the remote agent doesn't respond within the timeout, a TimeoutError is raised. The default timeout is 30 seconds.
6 Claim Protocol
When multiple commander agents are online, they need to coordinate who handles an incident. The claim protocol provides leaderless conflict resolution:
# When an alert arrives
won = await agent.claim_incident("incident-001", "tasks")
if won:
# We are responsible for this incident
await investigate_and_report()
else:
# Another agent claimed it first
pass
How Claims Work
- Agent publishes a
claimsignal with the incident ID and timestamp - Waits 500ms for competing claims from other agents
- If competing claims arrive, the earliest timestamp wins
- Loser backs off; winner coordinates the response
- When done, winner publishes
releaseto free the incident
7 SenseNetwork API
For managing multiple agents in the same process (useful for testing and simpler deployments), use SenseNetwork:
from bedsheet.sense import SenseNetwork
from bedsheet.testing import MockSenseTransport
# For testing (in-memory)
network = SenseNetwork(
namespace="cloud-ops",
transport=MockSenseTransport(),
)
# For production (PubNub)
network = SenseNetwork(
namespace="cloud-ops",
transport_factory=lambda: PubNubTransport(sub_key, pub_key),
)
# Add agents
await network.add(cpu_agent, channels=["alerts", "tasks"])
await network.add(commander, channels=["alerts", "tasks"])
# Later...
await network.stop()
8 Cloud Monitor Demo
The examples/cloud-monitor/ directory contains a complete demo with 5 agents running as separate processes:
| Agent | Role | Data Source |
|---|---|---|
cpu-watcher | Monitor CPU, alert on spikes | psutil |
memory-watcher | Monitor RAM and swap | psutil |
log-analyzer | Search and analyze logs | Simulated log buffer |
security-scanner | Scan ports, check logins | socket |
incident-commander | Coordinate alert responses | Sense network |
Running the Demo
export PUBNUB_SUBSCRIBE_KEY=sub-c-...
export PUBNUB_PUBLISH_KEY=pub-c-...
export ANTHROPIC_API_KEY=sk-ant-...
pip install bedsheet[sense] psutil
cd examples/cloud-monitor
python run.py
When CPU exceeds 80%, the cpu-watcher broadcasts an alert. The incident-commander claims it, queries the other agents for context, and synthesizes an incident report.
Agent Sentinel Demo
The examples/agent-sentinel/ directory demonstrates AI agent security monitoring, inspired by the OpenClaw crisis of 2026. Three worker agents perform real tasks (DuckDuckGo searches, JSON calendar management, skill installation with SHA-256 verification), while sentinel agents monitor them for rogue behavior:
| Agent | Role | Data Source |
|---|---|---|
web-researcher | Worker: web search | DuckDuckGo |
scheduler | Worker: calendar management | JSON file |
skill-acquirer | Worker: skill installation | Local ClawHub files |
behavior-sentinel | Sentinel: output rate monitoring | Activity log |
supply-chain-sentinel | Sentinel: skill integrity | SHA-256 hashing |
sentinel-commander | Commander: alert correlation | Sense network |
Each worker has a ~15% chance per cycle of going rogue. Sentinels detect anomalies through real file I/O and alert the commander over PubNub, who investigates and issues quarantine orders.
pip install bedsheet[sense] duckduckgo-search
cd examples/agent-sentinel
python run.py
9 Testing
Use MockSenseTransport for unit tests — it routes signals in-memory without PubNub:
from bedsheet.testing import MockSenseTransport
# Create a shared hub for multiple agents
transport = MockSenseTransport()
# Each agent gets its own peer transport
t1 = transport # First agent
t2 = transport.create_peer() # Second agent (shares the hub)
# Agents connected to the same hub can exchange signals
await agent1.join_network(t1, "test", ["alerts"])
await agent2.join_network(t2, "test", ["alerts"])
result = await agent1.request("agent2", "do something")
assert result == "done"
MockSenseTransport follows the same pattern as MockLLMClient — a test double that satisfies the protocol without external dependencies.
10 Reference
Signal Fields
| Field | Type | Description |
|---|---|---|
kind | SignalKind | One of: request, response, alert, heartbeat, claim, release, event |
sender | str | Name of the sending agent |
payload | dict | Arbitrary data (default: empty dict) |
correlation_id | str | Links requests to responses (auto-generated) |
target | str | None | Intended recipient (None = broadcast) |
timestamp | float | Unix timestamp (auto-set) |
source_channel | str | None | Channel the signal arrived on (set by transport) |
SenseMixin Methods
| Method | Description |
|---|---|
join_network(transport, namespace, channels) | Connect and start listening |
leave_network() | Disconnect and stop background tasks |
broadcast(channel, signal) | Send signal to a channel |
send_to(agent_name, signal) | Send signal to an agent's direct channel |
request(agent_name, task, timeout) | Send task and await response |
claim_incident(incident_id, channel) | Attempt to claim an incident |
release_incident(incident_id, channel) | Release a claimed incident |
on_signal(kind) | Decorator for custom signal handlers |
New Event Types
| Event | When Emitted |
|---|---|
SignalReceivedEvent | Signal arrived from network |
AgentConnectedEvent | Remote agent came online |
AgentDisconnectedEvent | Remote agent went offline |
RemoteDelegationEvent | Task sent to remote agent |
RemoteResultEvent | Result received from remote agent |