Sixth Sense Internals

How distributed agent communication actually works — what the code does, line by line, and what's still unresolved.

Honest Scope This document describes what the code actually does, not what it was intended to do. Where there are open questions, limitations, or untested assumptions, they are called out explicitly. Read this before making architectural decisions based on this module.

Overview

Sixth Sense lets Bedsheet agents talk to each other across process boundaries. Without it, agents are isolated — a Supervisor can coordinate collaborators within the same process, but cannot reach agents in another process, container, or machine.

With Sixth Sense, any agent can:

The module lives entirely in bedsheet/sense/ and is accessed via the SenseMixin class and SenseNetwork helper.

Architecture

Your Agent Process A Your Agent Process B ┌─────────────────────────────┐ ┌─────────────────────────────┐ │ MyAgent(SenseMixin, Agent) │ │ MyAgent(SenseMixin, Agent) │ │ ┌───────────────────────┐ │ │ ┌───────────────────────┐ │ │ │ SenseMixin │ │ │ │ SenseMixin │ │ │ │ _signal_loop (task) │ │ │ │ _signal_loop (task) │ │ │ │ _heartbeat_loop │ │ │ │ _heartbeat_loop │ │ │ │ _pending_requests │ │ │ │ _pending_requests │ │ │ │ _claimed_incidents │ │ │ │ _claimed_incidents │ │ │ └──────────┬────────────┘ │ │ └──────────┬────────────┘ │ │ │ │ │ │ │ │ ┌──────────▼────────────┐ │ │ ┌──────────▼────────────┐ │ │ │ SenseTransport │ │ │ │ SenseTransport │ │ │ │ (PubNubTransport or │ │ │ │ (PubNubTransport or │ │ │ │ MockSenseTransport) │ │ │ │ MockSenseTransport) │ │ │ └──────────┬────────────┘ │ │ └──────────┬────────────┘ │ └─────────────│───────────────┘ └─────────────│───────────────┘ │ │ ▼ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ PubNub Cloud │ │ bedsheet.{namespace}.alerts (broadcast channel) │ │ bedsheet.{namespace}.tasks (broadcast channel) │ │ bedsheet.{namespace}.heartbeat (broadcast channel) │ │ bedsheet.{namespace}.agent-name (direct channel per agent) │ └─────────────────────────────────────────────────────────────────────┘

Each agent runs two persistent asyncio background tasks: the signal loop (reads from the transport's queue and dispatches signals) and the heartbeat loop (broadcasts presence every 30 seconds).

Signal — the unit of communication

bedsheet/sense/signals.py

A Signal is a dataclass with seven fields:

@dataclass
class Signal:
    kind: SignalKind           # "request"|"response"|"alert"|"heartbeat"|"claim"|"release"|"event"
    sender: str                # agent name that sent this
    payload: dict[str, Any]    # arbitrary data, defaults to {}
    correlation_id: str        # auto-generated 12-char hex, used to match request→response
    target: str | None         # if set, only the named agent processes this signal
    timestamp: float           # unix timestamp set at creation (time.time())
    source_channel: str | None # set by PubNubTransport on receive, None from MockTransport
Known Gap source_channel is populated by PubNubTransport during deserialization but is never set by MockSenseTransport. Tests that assert routing logic based on which channel a signal arrived on will get None from the mock but a real value in production. This is a testing blind spot.

Signal Kinds

KindMeaningTypical senderReply expected?
requestAsk a specific agent to do something and return a resultCommander / orchestratorYes — a response on the same correlation_id
responseResult from an agent that received a requestWorker / sentinelNo
alertBroadcast notification of an anomaly or eventSentinel agentsNo — fire and forget
heartbeatPresence ping with capabilities listAll agents, every 30sNo
claimAnnounce intent to handle an incident_id exclusivelyAny agentIndirectly — competing claims trigger eviction
releaseRelinquish ownership of a previously claimed incidentAny agentNo
eventGeneric application event — not handled internally by SenseMixinAny agentNo

SenseTransport Protocol

bedsheet/sense/protocol.py

The transport is a structural Protocol — any class that implements these seven methods satisfies it, no inheritance needed (same pattern as LLMClient and Memory):

class SenseTransport(Protocol):
    async def connect(self, agent_id: str, namespace: str) -> None: ...
    async def disconnect(self) -> None: ...
    async def broadcast(self, channel: str, signal: Signal) -> None: ...
    async def subscribe(self, channel: str) -> None: ...
    async def unsubscribe(self, channel: str) -> None: ...
    def signals(self) -> AsyncIterator[Signal]: ...  # must be an async generator
    async def get_online_agents(self, channel: str) -> list[AgentPresence]: ...
Calling Convention for signals() signals() is declared as def (not async def) in the Protocol, but both implementations (PubNubTransport and MockSenseTransport) define it as async def with yield — making them async generator functions. An async generator function, when called, returns an AsyncIterator directly without needing to be awaited. Callers use: async for signal in transport.signals(). Do NOT add await before the call.

There are two implementations:

ClassFileUse
PubNubTransportbedsheet/sense/pubnub_transport.pyProduction — real PubNub cloud messaging
MockSenseTransportbedsheet/testing.pyTests — in-process shared queue via _MockSenseHub

PubNub Transport — Thread → asyncio bridge

bedsheet/sense/pubnub_transport.py

PubNub's Python SDK delivers incoming messages via callbacks on its own internal thread, not on the asyncio event loop. This is the central challenge of the PubNub integration.

The solution is a thread-safe queue bridge:

# The queue lives on the asyncio side
self._queue: asyncio.Queue[Signal] = asyncio.Queue()

# _SignalListener.message() is called by PubNub's thread
def message(self, pubnub, message: PNMessageResult) -> None:
    signal = deserialize(message.message, source_channel=message.channel)
    # call_soon_threadsafe schedules a put onto the asyncio event loop
    # from the PubNub thread — this is the ONLY safe way to cross this boundary
    self._loop.call_soon_threadsafe(self._queue.put_nowait, signal)

The asyncio side reads from the queue via the signals() async generator:

async def signals(self) -> AsyncIterator[Signal]:
    while True:
        signal = await self._queue.get()   # suspends until a message arrives
        yield signal

This generator never returns — it loops forever until the task is cancelled. The _signal_loop in SenseMixin iterates it with async for, which suspends on each await self._queue.get() call and yields control to the event loop while waiting.

Why call_soon_threadsafe? queue.put_nowait() is not thread-safe if called from a non-asyncio thread — it can corrupt internal state. loop.call_soon_threadsafe() schedules the call to run on the next iteration of the asyncio event loop, safely crossing the thread boundary. This is the standard Python pattern for bridging threaded callbacks into asyncio.

Channel Namespacing

All channels are prefixed to avoid collisions with other PubNub users on the same account:

def _full_channel(self, channel: str) -> str:
    if channel.startswith("bedsheet."):
        return channel                               # already prefixed
    return f"bedsheet.{self._namespace}.{channel}"  # e.g. "bedsheet.cloud-ops.alerts"

So when you call agent.broadcast("alerts", signal), PubNub actually publishes to bedsheet.cloud-ops.alerts. Each agent also automatically subscribes to its own name as a direct channel: bedsheet.cloud-ops.my-agent-name.

Serialization — compact JSON under 32KB

bedsheet/sense/serialization.py

PubNub has a 32KB message limit per publish. Signals are serialized with shortened keys to reduce payload size:

_KEY_MAP = {
    "kind": "k",
    "sender": "s",
    "payload": "p",
    "correlation_id": "c",
    "target": "t",
    "timestamp": "ts",
}

If the serialized signal still exceeds 30,000 bytes (leaving 2KB headroom), the payload is replaced with a truncation notice:

if len(encoded.encode("utf-8")) > MAX_MESSAGE_BYTES:
    data["p"] = {"_truncated": True, "summary": str(signal.payload)[:500]}
Truncation is silent to the receiver The receiver gets a signal with payload = {"_truncated": True, "summary": "..."} and no error is raised. No application code currently checks for the _truncated key. If an agent sends a very large response (e.g. an LLM output over 30KB), the receiving agent will get a truncated summary and silently use it as if it were the full result.

SenseMixin — what it adds to an Agent

bedsheet/sense/mixin.py

SenseMixin is a Python mixin class. To use it:

class MyAgent(SenseMixin, Agent):
    pass

Python's MRO (Method Resolution Order) ensures SenseMixin.__init__ runs before Agent.__init__ via super() chaining. SenseMixin adds these instance variables to every agent:

VariableTypePurpose
_transportSenseTransport | NoneThe active transport, or None if not connected
_namespacestrThe network namespace (e.g. "cloud-ops")
_signal_handlersdict[SignalKind, list[Callable]]Registered handlers per signal kind
_signal_taskasyncio.Task | NoneBackground task running _signal_loop
_pending_requestsdict[str, asyncio.Future[Signal]]Futures waiting for responses, keyed by correlation_id
_claimed_incidentsset[str]Incident IDs this agent currently owns
_heartbeat_taskasyncio.Task | NoneBackground task broadcasting heartbeat every 30s

The Signal Loop — how incoming signals are dispatched

When join_network() is called, an asyncio background task is created running _signal_loop(). This loop runs forever until cancelled (by leave_network()).

async def _signal_loop(self) -> None:
    async for signal in self._transport.signals():

        # 1. Skip signals we sent ourselves
        if signal.sender == self.name:
            continue

        # 2. Skip signals targeted at a different agent
        if signal.target and signal.target != self.name:
            continue

        # 3. If this is a response to one of our pending requests, resolve the future
        if signal.kind == "response" and signal.correlation_id in self._pending_requests:
            future = self._pending_requests[signal.correlation_id]
            future.set_result(signal)
            continue

        # 4. If this is a request to us, handle it in a new task (non-blocking)
        if signal.kind == "request":
            asyncio.create_task(self._handle_request(signal))
            continue

        # 5. Claim conflict resolution
        if signal.kind == "claim":
            self._handle_claim(signal)
            continue

        # 6. Release an incident we were tracking
        if signal.kind == "release":
            self._claimed_incidents.discard(signal.payload.get("incident_id"))
            continue

        # 7. Run user-registered handlers
        for handler in self._signal_handlers.get(signal.kind, []):
            await handler(signal)
Why asyncio.create_task for requests? A request handler calls self.invoke() which is a full LLM ReAct loop — it could take seconds. If we awaited it directly in the signal loop, no other signals could be processed during that time. create_task() schedules the handler as a concurrent task, letting the signal loop continue immediately.

Request / Response — asking another agent for a result

This is a synchronous-looking call that crosses a process boundary:

result = await commander.request("behavior-sentinel", "Check web-researcher for anomalies")

Internally, the full flow is:

Commander process Sentinel process │ │ │ 1. Generate correlation_id = "a3f9bc12" │ │ 2. Create asyncio.Future, store it in │ │ _pending_requests["a3f9bc12"] │ │ 3. broadcast to "behavior-sentinel" channel │ │─────── Signal{kind="request", target="behavior-sentinel", ────────▶│ │ correlation_id="a3f9bc12", payload={"task":"Check..."}} │ │ │ │ 4. await asyncio.wait_for(future, 30s) │ 5. _signal_loop receives signal │ (suspended) │ 6. asyncio.create_task(_handle_request) │ │ 7. _handle_request calls self.invoke() │ │ (full LLM ReAct loop) │ │ 8. Collects CompletionEvent.response │ │ 9. broadcast to "commander" channel │◀───── Signal{kind="response", ──────────────│ │ correlation_id="a3f9bc12", │ │ payload={"result":"Agent is normal"}} │ │ │ │ 10. _signal_loop sees correlation_id match │ │ future.set_result(signal) │ │ 11. asyncio.wait_for() returns │ │ 12. return signal.payload["result"] │

Timeout behaviour

If no response arrives within timeout seconds (default 30), asyncio.wait_for raises asyncio.TimeoutError, which is caught and re-raised as Python's built-in TimeoutError. The pending future is cleaned up in the finally block regardless.

Claim / Release — exclusive incident ownership

When multiple agents might respond to the same incident, the claim protocol ensures only one of them handles it. This is a distributed consensus problem solved with a simple heuristic: lower agent name wins.

How claim works

Agent "sentinel-a" calls claim_incident("alert-007"): 1. OPTIMISTICALLY add "alert-007" to _claimed_incidents (we assume we won unless evicted) 2. broadcast Signal{kind="claim", sender="sentinel-a", payload={"incident_id":"alert-007"}} 3. asyncio.sleep(0.5) -- wait 500ms for competing claims 4. Meanwhile, if "sentinel-b" also broadcast a claim, "sentinel-a"'s _signal_loop receives it and calls _handle_claim: def _handle_claim(self, signal): if incident_id in self._claimed_incidents: if signal.sender < self.name: # "sentinel-b" < "sentinel-a" ? self._claimed_incidents.discard(incident_id) # we lose 5. After sleep: return ("alert-007" in self._claimed_incidents) → True if we still hold it, False if evicted
Critical Limitation: The 500ms window is a race condition If two agents call claim_incident simultaneously and the network latency between them exceeds 500ms (e.g. agents in different regions), both agents may complete the sleep before seeing each other's claim signal. Both return True. This can cause duplicate handling of the same incident. The current implementation is suitable for agents on a low-latency LAN or within the same cloud region, not for globally distributed agents with >250ms one-way latency.
Limitation: Name-based tiebreak is fragile The winner is determined by lexicographic ordering of agent names. This means agent naming conventions directly affect which agent handles which incident. There is no priority field, no capability-based routing, and no retry if the winner fails. A proper implementation would use PubNub's presence timestamps or a distributed lock (e.g. Redis SET NX PX).

Heartbeat — presence and capability advertisement

Every 30 seconds, each agent broadcasts a heartbeat to the hardcoded "heartbeat" channel:

signal = Signal(
    kind="heartbeat",
    sender=self.name,
    payload={
        "capabilities": [action.name for group in self._action_groups
                         for action in group.get_actions()],
        "status": "ready",
    },
)
await self.broadcast("heartbeat", signal)
Limitation: hardcoded channel name "heartbeat" The heartbeat channel is hardcoded as the string "heartbeat". If any application creates a channel also named "heartbeat", it will receive a flood of heartbeat signals from every agent every 30 seconds. The channel should use an internal prefix like "__bedsheet_heartbeat__".

Heartbeat signals are not handled internally by the signal loop — they pass through to user-registered handlers. If no handler is registered for "heartbeat", they are silently dropped. The dashboard demo (agent-sentinel) registers a heartbeat handler to update agent presence cards.

SenseNetwork — managing multiple agents

bedsheet/sense/network.py

SenseNetwork is a convenience wrapper that gives each agent its own transport instance and connects them all to the same namespace:

# Testing: MockSenseTransport with hub pattern
network = SenseNetwork(namespace="ops", transport=MockSenseTransport())
await network.add(commander, channels=["alerts", "tasks"])
await network.add(sentinel, channels=["alerts"])

# Production: each agent gets its own independent PubNub connection
network = SenseNetwork(
    namespace="cloud-ops",
    transport_factory=lambda: PubNubTransport(subscribe_key, publish_key),
)
await network.add(commander, channels=["alerts", "tasks"])
await network.add(sentinel, channels=["alerts"])

# Shutdown
await network.stop()

The transport factory pattern is important: in production, each agent needs its own PubNub connection because PubNub tracks presence per connection (UUID). If two agents shared one connection, they would have the same UUID and PubNub would not be able to tell them apart.

MockSenseTransport and the hub pattern

bedsheet/testing.py

For tests, all agents need to share the same in-memory routing logic while each having their own queue. This is solved with _MockSenseHub:

# _MockSenseHub is shared by all transports
# Each transport has its own queue but shares the hub's subscription registry

hub = _MockSenseHub()
transport_a = MockSenseTransport(hub)
transport_b = transport_a.create_peer()  # creates a new MockSenseTransport(hub)

# When transport_a broadcasts to "alerts":
# hub routes a copy of the signal into the queue of every
# transport subscribed to "alerts" (except the sender)

Open Question: Sequential vs Parallel in GCP ADK templates

The GCP transpiler template (bedsheet/deploy/templates/gcp/agent.py.j2) generates a SequentialAgent sweep for the sub-agents, not a ParallelAgent.

Why it was changed to Sequential

The original template used ParallelAgent. During testing of the sentinel-gcp demo, all 4-5 sub-agents were invoked simultaneously, producing a burst of 5+ API calls within a few seconds. The gemini-3-flash-preview free tier allows 5 requests per minute total. The burst triggered HTTP 429 (RESOURCE_EXHAUSTED) errors from the Gemini API, causing the demo to fail. Changing to SequentialAgent fixed the rate limit issue by running one sub-agent at a time.

Why this may be wrong

The architectural intent was parallel delegation — a supervisor dispatches to all collaborators simultaneously and waits for all results. SequentialAgent changes the semantics: collaborators run one after another, meaning the total time is the sum of all individual times rather than the maximum. For a production ADK deployment on Vertex AI (not the free Gemini API), there are no such rate limits and ParallelAgent is the correct choice.

The right resolution

The template should offer both options. The decision should be driven by a config value in bedsheet.yaml, not hardcoded. For example:

targets:
  gcp:
    model: gemini-2.0-flash-001
    sub_agent_mode: parallel   # or: sequential

Until that config option exists, the template defaults to SequentialAgent as a safe default for free-tier Gemini. If you are deploying to Vertex AI with production quota, change SequentialAgent to ParallelAgent in the generated agent.py manually.

Known Limitations Summary

IssueLocationSeverityWorkaround
Claim race condition if latency > 250ms mixin.py claim_incident High for global deployments Only use within a single region / low-latency network
Name-based tiebreak for claim conflicts mixin.py _handle_claim Medium — predictable but fragile Name agents deliberately (lexicographically earlier = higher priority)
Payload truncation is silent serialization.py Medium for large LLM outputs Check for payload.get("_truncated") in handlers
Hardcoded "heartbeat" channel name mixin.py _heartbeat_loop Low — collision only if you name a channel "heartbeat" Don't use "heartbeat" as a channel name in your application
source_channel not set in MockSenseTransport testing.py MockSenseTransport.broadcast Low — only affects tests asserting channel routing Don't write tests that assert signal.source_channel
SequentialAgent vs ParallelAgent hardcoded agent.py.j2 template Medium for production deployments Change to ParallelAgent manually for Vertex AI deployments