Sixth Sense Internals
How distributed agent communication actually works — what the code does, line by line, and what's still unresolved.
Overview
Sixth Sense lets Bedsheet agents talk to each other across process boundaries. Without it, agents are isolated — a Supervisor can coordinate collaborators within the same process, but cannot reach agents in another process, container, or machine.
With Sixth Sense, any agent can:
- Broadcast signals to a named channel (PubNub topic)
- Send a signal directly to a specific agent and await a response
- Compete for exclusive ownership of an incident ID
- Receive typed signals and route them to registered handlers
The module lives entirely in bedsheet/sense/ and is accessed via the SenseMixin class and SenseNetwork helper.
Architecture
Each agent runs two persistent asyncio background tasks: the signal loop (reads from the transport's queue and dispatches signals) and the heartbeat loop (broadcasts presence every 30 seconds).
Signal — the unit of communication
bedsheet/sense/signals.pyA Signal is a dataclass with seven fields:
@dataclass
class Signal:
kind: SignalKind # "request"|"response"|"alert"|"heartbeat"|"claim"|"release"|"event"
sender: str # agent name that sent this
payload: dict[str, Any] # arbitrary data, defaults to {}
correlation_id: str # auto-generated 12-char hex, used to match request→response
target: str | None # if set, only the named agent processes this signal
timestamp: float # unix timestamp set at creation (time.time())
source_channel: str | None # set by PubNubTransport on receive, None from MockTransport
source_channel is populated by PubNubTransport during deserialization but is never set by MockSenseTransport. Tests that assert routing logic based on which channel a signal arrived on will get None from the mock but a real value in production. This is a testing blind spot.
Signal Kinds
| Kind | Meaning | Typical sender | Reply expected? |
|---|---|---|---|
request | Ask a specific agent to do something and return a result | Commander / orchestrator | Yes — a response on the same correlation_id |
response | Result from an agent that received a request | Worker / sentinel | No |
alert | Broadcast notification of an anomaly or event | Sentinel agents | No — fire and forget |
heartbeat | Presence ping with capabilities list | All agents, every 30s | No |
claim | Announce intent to handle an incident_id exclusively | Any agent | Indirectly — competing claims trigger eviction |
release | Relinquish ownership of a previously claimed incident | Any agent | No |
event | Generic application event — not handled internally by SenseMixin | Any agent | No |
SenseTransport Protocol
bedsheet/sense/protocol.pyThe transport is a structural Protocol — any class that implements these seven methods satisfies it, no inheritance needed (same pattern as LLMClient and Memory):
class SenseTransport(Protocol):
async def connect(self, agent_id: str, namespace: str) -> None: ...
async def disconnect(self) -> None: ...
async def broadcast(self, channel: str, signal: Signal) -> None: ...
async def subscribe(self, channel: str) -> None: ...
async def unsubscribe(self, channel: str) -> None: ...
def signals(self) -> AsyncIterator[Signal]: ... # must be an async generator
async def get_online_agents(self, channel: str) -> list[AgentPresence]: ...
signals() is declared as def (not async def) in the Protocol, but both implementations (PubNubTransport and MockSenseTransport) define it as async def with yield — making them async generator functions. An async generator function, when called, returns an AsyncIterator directly without needing to be awaited. Callers use: async for signal in transport.signals(). Do NOT add await before the call.
There are two implementations:
| Class | File | Use |
|---|---|---|
PubNubTransport | bedsheet/sense/pubnub_transport.py | Production — real PubNub cloud messaging |
MockSenseTransport | bedsheet/testing.py | Tests — in-process shared queue via _MockSenseHub |
PubNub Transport — Thread → asyncio bridge
bedsheet/sense/pubnub_transport.pyPubNub's Python SDK delivers incoming messages via callbacks on its own internal thread, not on the asyncio event loop. This is the central challenge of the PubNub integration.
The solution is a thread-safe queue bridge:
# The queue lives on the asyncio side
self._queue: asyncio.Queue[Signal] = asyncio.Queue()
# _SignalListener.message() is called by PubNub's thread
def message(self, pubnub, message: PNMessageResult) -> None:
signal = deserialize(message.message, source_channel=message.channel)
# call_soon_threadsafe schedules a put onto the asyncio event loop
# from the PubNub thread — this is the ONLY safe way to cross this boundary
self._loop.call_soon_threadsafe(self._queue.put_nowait, signal)
The asyncio side reads from the queue via the signals() async generator:
async def signals(self) -> AsyncIterator[Signal]:
while True:
signal = await self._queue.get() # suspends until a message arrives
yield signal
This generator never returns — it loops forever until the task is cancelled. The _signal_loop in SenseMixin iterates it with async for, which suspends on each await self._queue.get() call and yields control to the event loop while waiting.
queue.put_nowait() is not thread-safe if called from a non-asyncio thread — it can corrupt internal state. loop.call_soon_threadsafe() schedules the call to run on the next iteration of the asyncio event loop, safely crossing the thread boundary. This is the standard Python pattern for bridging threaded callbacks into asyncio.
Channel Namespacing
All channels are prefixed to avoid collisions with other PubNub users on the same account:
def _full_channel(self, channel: str) -> str:
if channel.startswith("bedsheet."):
return channel # already prefixed
return f"bedsheet.{self._namespace}.{channel}" # e.g. "bedsheet.cloud-ops.alerts"
So when you call agent.broadcast("alerts", signal), PubNub actually publishes to bedsheet.cloud-ops.alerts. Each agent also automatically subscribes to its own name as a direct channel: bedsheet.cloud-ops.my-agent-name.
Serialization — compact JSON under 32KB
bedsheet/sense/serialization.pyPubNub has a 32KB message limit per publish. Signals are serialized with shortened keys to reduce payload size:
_KEY_MAP = {
"kind": "k",
"sender": "s",
"payload": "p",
"correlation_id": "c",
"target": "t",
"timestamp": "ts",
}
If the serialized signal still exceeds 30,000 bytes (leaving 2KB headroom), the payload is replaced with a truncation notice:
if len(encoded.encode("utf-8")) > MAX_MESSAGE_BYTES:
data["p"] = {"_truncated": True, "summary": str(signal.payload)[:500]}
payload = {"_truncated": True, "summary": "..."} and no error is raised. No application code currently checks for the _truncated key. If an agent sends a very large response (e.g. an LLM output over 30KB), the receiving agent will get a truncated summary and silently use it as if it were the full result.
SenseMixin — what it adds to an Agent
bedsheet/sense/mixin.pySenseMixin is a Python mixin class. To use it:
class MyAgent(SenseMixin, Agent):
pass
Python's MRO (Method Resolution Order) ensures SenseMixin.__init__ runs before Agent.__init__ via super() chaining. SenseMixin adds these instance variables to every agent:
| Variable | Type | Purpose |
|---|---|---|
_transport | SenseTransport | None | The active transport, or None if not connected |
_namespace | str | The network namespace (e.g. "cloud-ops") |
_signal_handlers | dict[SignalKind, list[Callable]] | Registered handlers per signal kind |
_signal_task | asyncio.Task | None | Background task running _signal_loop |
_pending_requests | dict[str, asyncio.Future[Signal]] | Futures waiting for responses, keyed by correlation_id |
_claimed_incidents | set[str] | Incident IDs this agent currently owns |
_heartbeat_task | asyncio.Task | None | Background task broadcasting heartbeat every 30s |
The Signal Loop — how incoming signals are dispatched
When join_network() is called, an asyncio background task is created running _signal_loop(). This loop runs forever until cancelled (by leave_network()).
async def _signal_loop(self) -> None:
async for signal in self._transport.signals():
# 1. Skip signals we sent ourselves
if signal.sender == self.name:
continue
# 2. Skip signals targeted at a different agent
if signal.target and signal.target != self.name:
continue
# 3. If this is a response to one of our pending requests, resolve the future
if signal.kind == "response" and signal.correlation_id in self._pending_requests:
future = self._pending_requests[signal.correlation_id]
future.set_result(signal)
continue
# 4. If this is a request to us, handle it in a new task (non-blocking)
if signal.kind == "request":
asyncio.create_task(self._handle_request(signal))
continue
# 5. Claim conflict resolution
if signal.kind == "claim":
self._handle_claim(signal)
continue
# 6. Release an incident we were tracking
if signal.kind == "release":
self._claimed_incidents.discard(signal.payload.get("incident_id"))
continue
# 7. Run user-registered handlers
for handler in self._signal_handlers.get(signal.kind, []):
await handler(signal)
self.invoke() which is a full LLM ReAct loop — it could take seconds. If we awaited it directly in the signal loop, no other signals could be processed during that time. create_task() schedules the handler as a concurrent task, letting the signal loop continue immediately.
Request / Response — asking another agent for a result
This is a synchronous-looking call that crosses a process boundary:
result = await commander.request("behavior-sentinel", "Check web-researcher for anomalies")
Internally, the full flow is:
Timeout behaviour
If no response arrives within timeout seconds (default 30), asyncio.wait_for raises asyncio.TimeoutError, which is caught and re-raised as Python's built-in TimeoutError. The pending future is cleaned up in the finally block regardless.
Claim / Release — exclusive incident ownership
When multiple agents might respond to the same incident, the claim protocol ensures only one of them handles it. This is a distributed consensus problem solved with a simple heuristic: lower agent name wins.
How claim works
claim_incident simultaneously and the network latency between them exceeds 500ms (e.g. agents in different regions), both agents may complete the sleep before seeing each other's claim signal. Both return True. This can cause duplicate handling of the same incident. The current implementation is suitable for agents on a low-latency LAN or within the same cloud region, not for globally distributed agents with >250ms one-way latency.
SET NX PX).
Heartbeat — presence and capability advertisement
Every 30 seconds, each agent broadcasts a heartbeat to the hardcoded "heartbeat" channel:
signal = Signal(
kind="heartbeat",
sender=self.name,
payload={
"capabilities": [action.name for group in self._action_groups
for action in group.get_actions()],
"status": "ready",
},
)
await self.broadcast("heartbeat", signal)
"heartbeat". If any application creates a channel also named "heartbeat", it will receive a flood of heartbeat signals from every agent every 30 seconds. The channel should use an internal prefix like "__bedsheet_heartbeat__".
Heartbeat signals are not handled internally by the signal loop — they pass through to user-registered handlers. If no handler is registered for "heartbeat", they are silently dropped. The dashboard demo (agent-sentinel) registers a heartbeat handler to update agent presence cards.
SenseNetwork — managing multiple agents
bedsheet/sense/network.pySenseNetwork is a convenience wrapper that gives each agent its own transport instance and connects them all to the same namespace:
# Testing: MockSenseTransport with hub pattern
network = SenseNetwork(namespace="ops", transport=MockSenseTransport())
await network.add(commander, channels=["alerts", "tasks"])
await network.add(sentinel, channels=["alerts"])
# Production: each agent gets its own independent PubNub connection
network = SenseNetwork(
namespace="cloud-ops",
transport_factory=lambda: PubNubTransport(subscribe_key, publish_key),
)
await network.add(commander, channels=["alerts", "tasks"])
await network.add(sentinel, channels=["alerts"])
# Shutdown
await network.stop()
The transport factory pattern is important: in production, each agent needs its own PubNub connection because PubNub tracks presence per connection (UUID). If two agents shared one connection, they would have the same UUID and PubNub would not be able to tell them apart.
MockSenseTransport and the hub pattern
bedsheet/testing.pyFor tests, all agents need to share the same in-memory routing logic while each having their own queue. This is solved with _MockSenseHub:
# _MockSenseHub is shared by all transports
# Each transport has its own queue but shares the hub's subscription registry
hub = _MockSenseHub()
transport_a = MockSenseTransport(hub)
transport_b = transport_a.create_peer() # creates a new MockSenseTransport(hub)
# When transport_a broadcasts to "alerts":
# hub routes a copy of the signal into the queue of every
# transport subscribed to "alerts" (except the sender)
Open Question: Sequential vs Parallel in GCP ADK templates
The GCP transpiler template (bedsheet/deploy/templates/gcp/agent.py.j2) generates a SequentialAgent sweep for the sub-agents, not a ParallelAgent.
Why it was changed to Sequential
The original template used ParallelAgent. During testing of the sentinel-gcp demo, all 4-5 sub-agents were invoked simultaneously, producing a burst of 5+ API calls within a few seconds. The gemini-3-flash-preview free tier allows 5 requests per minute total. The burst triggered HTTP 429 (RESOURCE_EXHAUSTED) errors from the Gemini API, causing the demo to fail. Changing to SequentialAgent fixed the rate limit issue by running one sub-agent at a time.
Why this may be wrong
The architectural intent was parallel delegation — a supervisor dispatches to all collaborators simultaneously and waits for all results. SequentialAgent changes the semantics: collaborators run one after another, meaning the total time is the sum of all individual times rather than the maximum. For a production ADK deployment on Vertex AI (not the free Gemini API), there are no such rate limits and ParallelAgent is the correct choice.
The right resolution
The template should offer both options. The decision should be driven by a config value in bedsheet.yaml, not hardcoded. For example:
targets:
gcp:
model: gemini-2.0-flash-001
sub_agent_mode: parallel # or: sequential
Until that config option exists, the template defaults to SequentialAgent as a safe default for free-tier Gemini. If you are deploying to Vertex AI with production quota, change SequentialAgent to ParallelAgent in the generated agent.py manually.
Known Limitations Summary
| Issue | Location | Severity | Workaround |
|---|---|---|---|
| Claim race condition if latency > 250ms | mixin.py claim_incident | High for global deployments | Only use within a single region / low-latency network |
| Name-based tiebreak for claim conflicts | mixin.py _handle_claim | Medium — predictable but fragile | Name agents deliberately (lexicographically earlier = higher priority) |
| Payload truncation is silent | serialization.py | Medium for large LLM outputs | Check for payload.get("_truncated") in handlers |
| Hardcoded "heartbeat" channel name | mixin.py _heartbeat_loop | Low — collision only if you name a channel "heartbeat" | Don't use "heartbeat" as a channel name in your application |
| source_channel not set in MockSenseTransport | testing.py MockSenseTransport.broadcast | Low — only affects tests asserting channel routing | Don't write tests that assert signal.source_channel |
| SequentialAgent vs ParallelAgent hardcoded | agent.py.j2 template | Medium for production deployments | Change to ParallelAgent manually for Vertex AI deployments |