The Sixth Sense: Design & Implementation

A deep dive into the architecture, design choices, and PubNub integration behind Bedsheet's distributed agent communication module.

PubNub Protocol-based Mixin Architecture Async-first

1 Motivation

Before the Sixth Sense, Bedsheet agents were limited to single-process execution. A Supervisor could coordinate multiple agents, but they all lived in the same Python process. This created three problems:

  1. Scalability: A single process can only run so many agents before resource contention kills performance. CPU-intensive agents block the event loop for others.
  2. Isolation: One crashing agent can take down the whole system. No way to restart a single agent without restarting everything.
  3. Distribution: No way to run agents across machines or cloud providers. A monitoring agent on AWS can't talk to an analyzer on GCP.

The Sixth Sense solves this by giving agents distributed communication over PubNub. Each agent runs in its own process (or container, or cloud function) and communicates through publish/subscribe messaging. They find each other by subscribing to shared channels.

Design Goal

Add distributed communication without changing how single agents work. A regular Agent stays exactly the same. You opt in to networking by adding SenseMixin to your class hierarchy — zero changes to existing code.

2 Architecture Overview

The Sixth Sense is organized as a layered module inside bedsheet/sense/:

bedsheet/sense/ ├── __init__.py # Public API exports ├── signals.py # Signal dataclass & SignalKind type ├── serialization.py # Compact JSON serialization (30KB limit) ├── protocol.py # SenseTransport Protocol & AgentPresence ├── mixin.py # SenseMixin (the main integration point) ├── network.py # SenseNetwork (multi-agent convenience) └── pubnub_transport.py # PubNub implementation

Each layer has a clear responsibility:

signals.py

The Signal dataclass — the atomic unit of inter-agent communication. Seven signal kinds cover all interaction patterns.

serialization.py

Compact JSON encoding with short keys (k, s, p) to stay under PubNub's 32KB message limit. Auto-truncates large payloads.

protocol.py

The SenseTransport Protocol defines the transport contract. Any class implementing these 7 methods can serve as a transport.

mixin.py

SenseMixin adds network capabilities to any Agent via Python's multiple inheritance. This is the primary integration point.

Key Insight: The Dependency Direction

Dependencies flow inward: mixin.py depends on protocol.py (not on pubnub_transport.py). The PubNub transport is the outermost layer and is completely swappable. You could write a Redis, NATS, or MQTT transport without touching any other file.

3 Signal Design

The Signal Dataclass

@dataclass
class Signal:
    kind: SignalKind          # What type of signal
    sender: str               # Who sent it (agent name)
    payload: dict[str, Any]   # Arbitrary data
    correlation_id: str       # Links requests to responses
    target: str | None        # Intended recipient (None = broadcast)
    timestamp: float          # Unix timestamp (auto-set)
    source_channel: str | None  # Which channel it arrived on

Why a Dataclass, Not a Pydantic Model?

Bedsheet uses @dataclass for all data structures (events, signals, messages). Pydantic adds validation overhead that's unnecessary for internal data structures where the framework controls construction. Dataclasses give us type hints, __eq__, and __repr__ for free with zero runtime cost. The same reasoning applies to the existing Event types in bedsheet/events.py.

The Seven Signal Kinds

We use a Literal type (not an Enum) for signal kinds. This gives us type-checking without the overhead of enum instances:

SignalKind = Literal[
    "request",    # Ask another agent to do something
    "response",   # Reply to a request
    "alert",      # Broadcast an alert to all listeners
    "heartbeat",  # Periodic "I'm alive" signal
    "claim",      # Attempt to claim ownership of an incident
    "release",    # Release a claimed incident
    "event",      # General-purpose notification
]
KindDirectionPurpose
requestTargeted (one agent)Delegate a task to a specific agent via request()
responseTargeted (requester)Return the result of a request, matched by correlation_id
alertBroadcast (all)Notify all subscribers of a condition (CPU high, breach detected)
heartbeatBroadcast (all)Periodic signal with agent capabilities for presence detection
claimBroadcast (all)Attempt to own an incident (for conflict-free coordination)
releaseBroadcast (all)Release ownership so another agent can claim it
eventBroadcast or targetedGeneral-purpose notification for extensibility

Why Exactly Seven Kinds?

request + response cover RPC-style delegation. alert + event cover pub/sub notifications. claim + release cover distributed coordination. heartbeat covers presence. This set is minimal but sufficient — every agent interaction pattern we've encountered maps to one of these seven. We chose not to make it extensible (e.g., custom kinds) because a fixed set enables optimized routing in the signal loop.

Correlation IDs

Every signal gets a correlation_id — a 12-character hex string from uuid4(). This is how request/response pairs are matched:

# Commander sends a request with correlation_id="a1b2c3d4e5f6"
signal = Signal(kind="request", sender="commander",
                payload={"task": "check CPU"}, target="cpu-watcher")

# cpu-watcher responds with the SAME correlation_id
response = Signal(kind="response", sender="cpu-watcher",
                  payload={"result": "CPU at 45%"},
                  correlation_id="a1b2c3d4e5f6",  # same ID
                  target="commander")

The mixin's signal loop checks self._pending_requests[signal.correlation_id] to resolve the correct asyncio.Future. This means an agent can have multiple concurrent requests in flight without confusion.

4 Transport Protocol

@runtime_checkable
class SenseTransport(Protocol):
    async def connect(self, agent_id: str, namespace: str) -> None: ...
    async def disconnect(self) -> None: ...
    async def broadcast(self, channel: str, signal: Signal) -> None: ...
    async def subscribe(self, channel: str) -> None: ...
    async def unsubscribe(self, channel: str) -> None: ...
    def signals(self) -> AsyncIterator[Signal]: ...
    async def get_online_agents(self, channel: str) -> list[AgentPresence]: ...

Why Protocol, Not ABC?

Bedsheet uses Protocol (structural subtyping) throughout the codebase — LLMClient, Memory, and now SenseTransport. The philosophy: if it walks like a duck and quacks like a duck, it's a duck. A class satisfies SenseTransport by implementing the right methods, without explicitly inheriting from it. This means you can write a transport in a separate package that doesn't import bedsheet at all — it just needs the right method signatures. The @runtime_checkable decorator lets us use isinstance() checks when needed.

The Seven Methods

The protocol is intentionally minimal. Every method maps to a fundamental pub/sub operation:

MethodWhat It DoesPubNub Equivalent
connect()Establish connection with identityCreate PubNubAsyncio instance
disconnect()Clean up and closepubnub.stop()
broadcast()Publish a signal to a channelpubnub.publish()
subscribe()Listen to a channelpubnub.subscribe()
unsubscribe()Stop listeningpubnub.unsubscribe()
signals()Async iterator of incoming signalsRead from asyncio.Queue
get_online_agents()Who's on a channel right now?pubnub.here_now()

AgentPresence

AgentPresence is a simple dataclass representing a remote agent's identity on the network:

@dataclass
class AgentPresence:
    agent_id: str
    agent_name: str
    namespace: str
    capabilities: list[str] = field(default_factory=list)
    status: str = "online"
    metadata: dict[str, Any] = field(default_factory=dict)

This is returned by get_online_agents() and populated from PubNub's here_now() API (or from heartbeat signals in the mock transport).

5 Why PubNub

We evaluated several messaging backends before choosing PubNub:

OptionProsCons
PubNubZero infrastructure, built-in presence, global CDN, generous free tier32KB message limit, vendor lock-in
Redis Pub/SubFast, familiar, no message limitsRequires Redis server, no built-in presence, no cross-cloud
NATSVery fast, cloud-nativeRequires NATS server, complex JetStream setup for persistence
WebSocket serverFull control, no vendorMust build everything: routing, presence, reconnection, scaling
AWS SNS/SQSNative AWS, highly reliableAWS-only, complex setup, not real-time (polling)

The Decision

PubNub won because of zero infrastructure. A demo user signs up, gets keys, and has global real-time messaging in 5 minutes. No Docker containers, no cloud setup, no servers to manage. The 32KB limit is handled by compact serialization (short keys, auto-truncation). The vendor lock-in is mitigated by the SenseTransport Protocol — you can swap to Redis or NATS by implementing 7 methods.

PubNub's Key Features We Use

6 The Thread-to-Asyncio Bridge

This is the trickiest piece of the PubNub integration. PubNub's Python SDK uses threaded callbacks for incoming messages, but Bedsheet is fully async. We need to safely cross the thread boundary.

The Problem

# PubNub calls this from a BACKGROUND THREAD:
class _SignalListener(SubscribeCallback):
    def message(self, pubnub, message):
        # We're on PubNub's thread, NOT the asyncio event loop!
        # Cannot await anything, cannot use asyncio directly
        signal = deserialize(message.message)
        # How do we get this signal to the async signal_loop?

The Solution: call_soon_threadsafe

class _SignalListener(SubscribeCallback):
    def __init__(self, queue: asyncio.Queue[Signal], loop: asyncio.AbstractEventLoop):
        self._queue = queue
        self._loop = loop

    def message(self, pubnub, message):
        signal = deserialize(message.message, source_channel=message.channel)
        # Thread-safe way to put into the asyncio queue:
        self._loop.call_soon_threadsafe(self._queue.put_nowait, signal)

What's Happening Here

  1. We capture the asyncio event loop reference at connection time (asyncio.get_running_loop())
  2. We pass the loop and an asyncio.Queue to the PubNub callback listener
  3. When PubNub's thread delivers a message, we call loop.call_soon_threadsafe()
  4. This schedules queue.put_nowait(signal) to run on the event loop's thread
  5. The mixin's _signal_loop() awaits queue.get() on the async side

call_soon_threadsafe is the standard Python mechanism for thread-to-asyncio communication. It's safe to call from any thread and guarantees the callback runs on the event loop's thread during the next iteration.

Why Not asyncio.run_coroutine_threadsafe?

We use call_soon_threadsafe with put_nowait instead of run_coroutine_threadsafe with put because put_nowait is synchronous and never blocks. The queue has no max size, so it won't raise QueueFull. This is simpler and avoids creating unnecessary futures on the PubNub callback thread.

7 Channel Naming Convention

PubNub channels are just strings. We use a namespaced convention to prevent collisions:

def _full_channel(self, channel: str) -> str:
    """Expand short channel name to full namespaced channel."""
    if channel.startswith("bedsheet."):
        return channel
    return f"bedsheet.{self._namespace}.{channel}"

# Examples:
# "alerts"  → "bedsheet.cloud-ops.alerts"
# "tasks"   → "bedsheet.cloud-ops.tasks"
# "agent-1" → "bedsheet.cloud-ops.agent-1"  (direct channel)

Every agent also subscribes to its own direct channel (bedsheet.{namespace}.{agent_name}). This enables targeted messaging — when you call send_to("cpu-watcher", signal), it publishes to bedsheet.cloud-ops.cpu-watcher.

Why Namespace Channels?

Namespacing prevents cross-contamination between different deployments sharing the same PubNub keys. A staging environment using namespace "staging" won't interfere with production using "prod", even on the same PubNub app.

8 The Mixin Pattern

SenseMixin is the heart of the Sixth Sense. It adds distributed capabilities to any Agent without modifying the Agent class itself:

class SenseMixin:
    """Mixin that adds distributed sensing to any Agent."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)  # Calls Agent.__init__()
        self._transport: SenseTransport | None = None
        self._namespace: str = ""
        self._signal_handlers: dict[SignalKind, list[SignalHandler]] = {}
        self._signal_task: asyncio.Task | None = None
        self._pending_requests: dict[str, asyncio.Future[Signal]] = {}
        self._claimed_incidents: set[str] = set()
        self._heartbeat_task: asyncio.Task | None = None

How Python MRO Makes This Work

class MyAgent(SenseMixin, Agent):
    pass

# Method Resolution Order (MRO):
# MyAgent → SenseMixin → Agent → object

# When you call MyAgent(name="x", instruction="y", model_client=client):
# 1. MyAgent.__init__ → not defined, goes to next
# 2. SenseMixin.__init__(self, name="x", ...)
#    → super().__init__(name="x", ...) → Agent.__init__
# 3. Agent.__init__ sets up name, instruction, model_client
# 4. Back in SenseMixin.__init__, adds _transport, _signal_handlers, etc.

Why a Mixin, Not Inheritance or Composition?

Option A: Inheritance (class SenseAgent(Agent)) — Forces users to subclass a specific class. Can't add sensing to a Supervisor, or to user's custom Agent subclass.

Option B: Composition (agent.sense = SenseAdapter(agent)) — Cleaner separation, but awkward API. Users would write agent.sense.broadcast() instead of agent.broadcast(). The adapter would need to reach into the agent's internals for invoke().

Option C: Mixin (class MyAgent(SenseMixin, Agent)) — Best of both worlds. The agent IS-A SenseMixin and IS-A Agent. broadcast(), request(), on_signal() feel like native agent methods. Works with both Agent and Supervisor.

The type: ignore[attr-defined] Pattern

You'll notice # type: ignore[attr-defined] comments throughout the mixin. This is because SenseMixin accesses self.name and self.invoke(), which are defined on Agent, not on the mixin itself:

await transport.connect(self.name, namespace)  # type: ignore[attr-defined]

Mypy can't know that SenseMixin will always be combined with Agent. The type ignores acknowledge this limitation. An alternative would be a generic Protocol for the host class, but that adds complexity for no runtime benefit.

9 Request/Response Over PubNub

The most complex pattern is request/response — asking a remote agent to do something and waiting for the answer. This implements an RPC-like pattern over pub/sub messaging.

The Flow

Commander requests CPU check from cpu-watcher

  1. Commander calls request("cpu-watcher", "check CPU")
  2. Creates a correlation_id and an asyncio.Future
  3. Stores the future in self._pending_requests[correlation_id]
  4. Sends a request signal to cpu-watcher's direct channel
  5. Awaits the future with a timeout
  1. cpu-watcher's signal loop receives the request
  2. Calls _handle_request() which runs self.invoke(session_id, task)
  3. The agent's LLM processes the task, calls tools, generates a response
  4. Sends a response signal back with the same correlation_id
  1. Commander's signal loop receives the response
  2. Matches correlation_id to the pending future
  3. Resolves the future with the response signal
  4. Commander's request() returns the result string

Key Implementation Detail: _handle_request

async def _handle_request(self, signal: Signal) -> None:
    task = signal.payload.get("task", "")
    session_id = f"sense-{signal.correlation_id}"

    # Run the full agent loop (LLM + tools)
    result = ""
    async for event in self.invoke(session_id, task):
        if isinstance(event, CompletionEvent):
            result = event.response

    # Send response back to requester
    response_signal = Signal(
        kind="response",
        sender=self.name,
        payload={"result": result},
        correlation_id=signal.correlation_id,
        target=signal.sender,
    )
    await self.send_to(signal.sender, response_signal)

This is where the Sixth Sense connects to Bedsheet's core: the request handler calls self.invoke(), which runs the agent's full ReAct loop (LLM reasoning → tool calls → final response). The remote agent isn't just a message router — it's a full AI agent that thinks about the request and uses its tools to answer it.

Concurrency

Requests are handled with asyncio.create_task(self._handle_request(signal)), so multiple requests can be processed concurrently. An agent can handle several incoming requests while also sending its own requests to others.

10 Claim Protocol

When multiple agents see the same alert, we need exactly one to handle it. The claim protocol provides leaderless conflict resolution — no central coordinator required.

How It Works

async def claim_incident(self, incident_id: str, channel: str) -> bool:
    # 1. Broadcast our claim
    signal = Signal(kind="claim", sender=self.name,
                    payload={"incident_id": incident_id})
    await self.broadcast(channel, signal)

    # 2. Wait 500ms for competing claims
    await asyncio.sleep(0.5)

    # 3. If still in our claimed set, we won
    return incident_id in self._claimed_incidents

The conflict resolution happens in _handle_claim():

def _handle_claim(self, signal: Signal) -> None:
    incident_id = signal.payload.get("incident_id")
    if incident_id in self._claimed_incidents:
        # Tie-breaking: lower sender name wins
        if signal.sender < self.name:
            self._claimed_incidents.discard(incident_id)

Why Not a Distributed Lock?

A proper distributed lock (Redlock, ZooKeeper, etcd) would guarantee exactly-once processing. But it would also require additional infrastructure, add latency, and create failure modes. The claim protocol is probabilistic but practical: the 500ms window and deterministic tie-breaking (alphabetical agent name) give us conflict-free coordination 99%+ of the time. For a monitoring system where occasional duplicate handling is harmless, this is the right trade-off.

11 Testing Strategy

The MockSenseTransport

The key testing challenge: how to test distributed agent communication without PubNub? The answer is MockSenseTransport, which follows the same pattern as MockLLMClient.

The Hub Pattern

class _MockSenseHub:
    """Shared state for mock transports. Routes signals between agents."""

    def __init__(self):
        self.queues: dict[str, asyncio.Queue[Signal]] = {}
        self.subscriptions: dict[str, set[str]] = {}
        self.presences: dict[str, AgentPresence] = {}

class MockSenseTransport:
    def __init__(self, hub: _MockSenseHub | None = None):
        self.hub = hub or _MockSenseHub()

    def create_peer(self) -> "MockSenseTransport":
        """Create a sibling transport sharing the same hub."""
        return MockSenseTransport(self.hub)

Why the Hub Pattern?

The first implementation used a single transport for all agents. This failed because connect(agent_id) overwrites the agent's identity — the second agent's connect would overwrite the first agent's ID. The hub pattern gives each agent its own transport instance (with its own identity and queue) while sharing the routing infrastructure. create_peer() returns a new transport connected to the same hub.

Test Design Lessons

Don't Read From the Queue Directly

The signal loop is a background task that continuously reads from the transport's queue. If a test tries to read from the queue directly (e.g., await queue.get()), it races with the signal loop. Use on_signal handlers in tests instead, and await asyncio.sleep() to give the loop time to process.

Test coverage (31 tests in tests/test_sense.py):

Test GroupCountWhat's Covered
Signal creation3Dataclass defaults, payload, all 7 kinds
Serialization5Compact encoding, roundtrip, truncation, minimal deserialization
Protocol2Mock satisfies protocol, AgentPresence creation
MockSenseTransport4Connect/disconnect, subscribe/broadcast, online agents, create_peer
SenseMixin7Join/leave, broadcast, request/response, timeout, handlers, self-skip, targeting
Claim protocol2Claim win, release
SenseNetwork3Add agent, reject non-sense agent, stop disconnects all
Events5All 5 new event dataclasses

12 Trade-offs & Future Work

What We Chose

DecisionBenefitCost
PubNub over self-hostedZero infrastructure32KB limit, vendor dependency
Mixin over compositionNatural API (agent.request())type: ignore comments
Protocol over ABCStructural subtypingNo enforced implementation
Probabilistic claimsNo infrastructure neededRare duplicate handling
Short-key serializationStays under 32KBLess readable wire format
Hub pattern for testingTrue multi-agent testsMore complex mock

Known Limitations

Future Directions