The Sixth Sense

Distributed communication for Bedsheet agents. Run agents across processes, machines, and cloud providers — they find each other and collaborate over PubNub.

PubNub-backed Zero infrastructure Real-time Works behind firewalls

1 Overview

Bedsheet's Supervisor pattern works great when all agents run in a single Python process. But what if your agents need to run on different machines, behind different firewalls, or even on different cloud providers?

The Sixth Sense module adds distributed communication to any Bedsheet agent via PubNub's real-time messaging platform. Agents become network-aware peers that can broadcast alerts, send requests to each other, and coordinate incident responses — all without running HTTP servers or managing service discovery.

Pure Clients

Agents are PubNub clients, not servers. No ports to open, no URLs to register. Works behind NATs and firewalls.

Signal-Based

Seven signal kinds (request, response, alert, heartbeat, claim, release, event) cover all coordination patterns.

Swappable Transport

SenseTransport protocol means PubNub is one implementation. Swap in MQTT, Redis Streams, or your own.

Leaderless Coordination

Claim-based protocol lets multiple commanders compete for incident ownership. No central coordinator needed.

bedsheet/sense/ ├── __init__.py # Exports ├── protocol.py # SenseTransport protocol ├── signals.py # Signal dataclass + kinds ├── serialization.py # Compact JSON (<32KB) ├── network.py # SenseNetwork wrapper ├── mixin.py # SenseMixin for agents └── pubnub_transport.py # PubNub implementation

2 Setup

Install

pip install bedsheet[sense]

This adds the pubnub package as a dependency.

Get PubNub Keys

  1. Sign up at pubnub.com (free tier: 200 MAU, unlimited channels)
  2. Create an app in the PubNub dashboard
  3. Copy your Subscribe Key and Publish Key
export PUBNUB_SUBSCRIBE_KEY=sub-c-your-key-here
export PUBNUB_PUBLISH_KEY=pub-c-your-key-here
Tip

Enable Presence in your PubNub keyset settings to use get_online_agents().

3 Your First Sense Agent

Any Bedsheet Agent becomes network-aware by adding the SenseMixin:

from bedsheet import Agent, ActionGroup, SenseMixin
from bedsheet.llm.anthropic import AnthropicClient
from bedsheet.sense.pubnub_transport import PubNubTransport

# 1. Create a sense-aware agent class
class MyAgent(SenseMixin, Agent):
    pass

# 2. Build the agent as usual
agent = MyAgent(
    name="my-agent",
    instruction="You are a helpful agent.",
    model_client=AnthropicClient(),
)

# 3. Connect to the network
transport = PubNubTransport(
    subscribe_key="sub-c-...",
    publish_key="pub-c-...",
)
await agent.join_network(transport, "my-namespace", ["alerts", "tasks"])

# 4. The agent can now send and receive signals!
# ... do work ...

# 5. Disconnect when done
await agent.leave_network()

What's happening?

  1. SenseMixin adds network methods to the Agent without changing the core ReAct loop
  2. join_network() connects to PubNub, subscribes to channels, and starts a background signal processing loop
  3. The agent automatically subscribes to its own direct channel (named after the agent)
  4. A heartbeat broadcasts the agent's capabilities every 30 seconds

4 Signals

A Signal is the unit of inter-agent communication. Every signal has a kind, a sender, and an optional payload:

from bedsheet.sense import Signal

# Broadcast an alert
alert = Signal(
    kind="alert",
    sender="cpu-watcher",
    payload={"severity": "high", "cpu": 95.2},
)
await agent.broadcast("alerts", alert)

Signal Kinds

KindPurposePattern
requestAsk an agent to do workPeer-to-peer, expects response
responseReturn resultsPeer-to-peer, has correlation_id
alertBroadcast an observationOne-to-many
heartbeatLiveness + capabilitiesOne-to-many (periodic)
claimClaim an incidentOne-to-many (conflict resolution)
releaseRelease a claimed incidentOne-to-many
eventSerialized bedsheet EventOne-to-many (observability)

Channel Naming

Channels follow the convention bedsheet.<namespace>.<purpose>:

bedsheet.cloud-ops.alerts       # Alert broadcasts
bedsheet.cloud-ops.tasks        # Task coordination / claims
bedsheet.cloud-ops.cpu-watcher  # Direct channel to cpu-watcher agent

Custom Signal Handlers

@agent.on_signal("alert")
async def handle_alert(signal: Signal):
    severity = signal.payload.get("severity")
    print(f"Alert from {signal.sender}: {severity}")

5 Request / Response

The request() method sends a task to a remote agent and waits for the response. Under the hood, the receiving agent runs invoke() with the task and sends back the completion:

# Commander asks cpu-watcher to check usage
result = await commander.request(
    "cpu-watcher",
    "What is the current CPU usage?",
    timeout=30.0,
)
print(result)  # "Overall: 45.2%, Per-core: [32.1, 58.3, ...]"

What's happening?

  1. Commander creates a request signal with a unique correlation_id
  2. Signal is published to the cpu-watcher's direct channel
  3. cpu-watcher's signal loop receives the request
  4. cpu-watcher calls self.invoke() with the task text
  5. The LLM uses cpu-watcher's tools to gather data
  6. The CompletionEvent response is sent back as a response signal
  7. Commander's future resolves with the result
Timeout

If the remote agent doesn't respond within the timeout, a TimeoutError is raised. The default timeout is 30 seconds.

6 Claim Protocol

When multiple commander agents are online, they need to coordinate who handles an incident. The claim protocol provides leaderless conflict resolution:

# When an alert arrives
won = await agent.claim_incident("incident-001", "tasks")
if won:
    # We are responsible for this incident
    await investigate_and_report()
else:
    # Another agent claimed it first
    pass

How Claims Work

  1. Agent publishes a claim signal with the incident ID and timestamp
  2. Waits 500ms for competing claims from other agents
  3. If competing claims arrive, the earliest timestamp wins
  4. Loser backs off; winner coordinates the response
  5. When done, winner publishes release to free the incident

7 SenseNetwork API

For managing multiple agents in the same process (useful for testing and simpler deployments), use SenseNetwork:

from bedsheet.sense import SenseNetwork
from bedsheet.testing import MockSenseTransport

# For testing (in-memory)
network = SenseNetwork(
    namespace="cloud-ops",
    transport=MockSenseTransport(),
)

# For production (PubNub)
network = SenseNetwork(
    namespace="cloud-ops",
    transport_factory=lambda: PubNubTransport(sub_key, pub_key),
)

# Add agents
await network.add(cpu_agent, channels=["alerts", "tasks"])
await network.add(commander, channels=["alerts", "tasks"])

# Later...
await network.stop()

8 Cloud Monitor Demo

The examples/cloud-monitor/ directory contains a complete demo with 5 agents running as separate processes:

AgentRoleData Source
cpu-watcherMonitor CPU, alert on spikespsutil
memory-watcherMonitor RAM and swappsutil
log-analyzerSearch and analyze logsSimulated log buffer
security-scannerScan ports, check loginssocket
incident-commanderCoordinate alert responsesSense network

Running the Demo

export PUBNUB_SUBSCRIBE_KEY=sub-c-...
export PUBNUB_PUBLISH_KEY=pub-c-...
export ANTHROPIC_API_KEY=sk-ant-...

pip install bedsheet[sense] psutil

cd examples/cloud-monitor
python run.py
============================================================ Cloud Monitor - Bedsheet Sense Demo Launching 5 distributed agents... ============================================================ Starting cpu-watcher... [cpu-watcher] Online and monitoring... Starting memory-watcher... [memory-watcher] Online and monitoring... Starting log-analyzer... [log-analyzer] Online and ready... Starting security-scanner... [security-scanner] Online and ready... Starting incident-commander... [incident-commander] Online and coordinating... ============================================================ All agents online! Press Ctrl+C to stop. ============================================================

When CPU exceeds 80%, the cpu-watcher broadcasts an alert. The incident-commander claims it, queries the other agents for context, and synthesizes an incident report.

Agent Sentinel Demo

The examples/agent-sentinel/ directory demonstrates AI agent security monitoring, inspired by the OpenClaw crisis of 2026. Three worker agents perform real tasks (DuckDuckGo searches, JSON calendar management, skill installation with SHA-256 verification), while sentinel agents monitor them for rogue behavior:

AgentRoleData Source
web-researcherWorker: web searchDuckDuckGo
schedulerWorker: calendar managementJSON file
skill-acquirerWorker: skill installationLocal ClawHub files
behavior-sentinelSentinel: output rate monitoringActivity log
supply-chain-sentinelSentinel: skill integritySHA-256 hashing
sentinel-commanderCommander: alert correlationSense network

Each worker has a ~15% chance per cycle of going rogue. Sentinels detect anomalies through real file I/O and alert the commander over PubNub, who investigates and issues quarantine orders.

pip install bedsheet[sense] duckduckgo-search
cd examples/agent-sentinel
python run.py

9 Testing

Use MockSenseTransport for unit tests — it routes signals in-memory without PubNub:

from bedsheet.testing import MockSenseTransport

# Create a shared hub for multiple agents
transport = MockSenseTransport()

# Each agent gets its own peer transport
t1 = transport          # First agent
t2 = transport.create_peer()  # Second agent (shares the hub)

# Agents connected to the same hub can exchange signals
await agent1.join_network(t1, "test", ["alerts"])
await agent2.join_network(t2, "test", ["alerts"])

result = await agent1.request("agent2", "do something")
assert result == "done"
Note

MockSenseTransport follows the same pattern as MockLLMClient — a test double that satisfies the protocol without external dependencies.

10 Reference

Signal Fields

FieldTypeDescription
kindSignalKindOne of: request, response, alert, heartbeat, claim, release, event
senderstrName of the sending agent
payloaddictArbitrary data (default: empty dict)
correlation_idstrLinks requests to responses (auto-generated)
targetstr | NoneIntended recipient (None = broadcast)
timestampfloatUnix timestamp (auto-set)
source_channelstr | NoneChannel the signal arrived on (set by transport)

SenseMixin Methods

MethodDescription
join_network(transport, namespace, channels)Connect and start listening
leave_network()Disconnect and stop background tasks
broadcast(channel, signal)Send signal to a channel
send_to(agent_name, signal)Send signal to an agent's direct channel
request(agent_name, task, timeout)Send task and await response
claim_incident(incident_id, channel)Attempt to claim an incident
release_incident(incident_id, channel)Release a claimed incident
on_signal(kind)Decorator for custom signal handlers

New Event Types

EventWhen Emitted
SignalReceivedEventSignal arrived from network
AgentConnectedEventRemote agent came online
AgentDisconnectedEventRemote agent went offline
RemoteDelegationEventTask sent to remote agent
RemoteResultEventResult received from remote agent