-
Notifications
You must be signed in to change notification settings - Fork 54
Description
Observability module for RAI
RAI now serves increasingly diverse robotics and AI deployments, and subtle issues can slip by unnoticed. We need a built-in observability layer in rai_core (covering base connectors and base agents) that makes it easy to see when things go wrong or slow down. Today, small glitches and unexpected latency spikes are easy to miss, and diagnosing problems like missing connections or failing modules is harder than it should be.
The goal is to provide first-class visibility into system health: connections, timing, throughput, and errors. With concise, low-overhead telemetry baked into the core, operators and developers should be able to spot anomalies early and troubleshoot confidently across different communication backends and agent configurations.
Connectors are an agent's eyes, ears, and hands; instrumenting them with sinks gives us a rich, ground-truth stream about what the agent perceives and does, without touching the agent’s logic.
Development goals
We want observability to feel almost invisible. A single toggle or endpoint makes it work, and if left unset the system behaves exactly as before. Performance overhead should stay minimal. Telemetry is outbound-only; live reconfiguration uses a separate control path. Both streams are exposed to the GUI, but they are separable via distinct abstractions (sink for telemetry, control client/channel for params).
- Support multiple transports (WS/TCP, HTTP, or OTLP later) without binding
rai_coreto one sink. - Tolerate late or missing servers with bounded buffering and reconnection.
- Cover base connectors and base agents: lifecycle, registration, messaging, latency, errors, and GUI-accessible reconfiguration via the control channel.
Non-goals
This RFC does not propose building the full backend stack; we emit signals and expect an external service to collect and show them. Lossless telemetry under every failure mode-bounded buffers is not a goal and occasional drops are acceptable. Full distributed tracing can be layered later; for now we focus on lightweight metrics and events.
Requirements
The experience should stay simple to configure and safe under failure:
- Single env/config (e.g.,
RAI_OBS_ENDPOINT=ws://host:9000) turns it on; default is no-op. - Sink failures never leak into user code; degrade to buffered mode or no-op.
- Hot paths stay non-blocking with bounded queues that drop oldest under pressure.
- Handle "server not up yet" and reconnect with backoff.
- Lightweight, versioned event schema; pluggable sinks without touching connector/agent internals.
- Dynamic reconfiguration is handled by a separate control channel, not by the telemetry sink.
Architecture (high level)
At the core is an ObservabilitySink interface with a record(event) method and optional start/stop/flush, placed in rai.communication.observability. The default implementation is NoOpSink. Concrete sinks live beside it: in the RFC we target SocketSink/BufferedSink (the current POC sends directly over a socket without the sink abstraction), and later HttpSink, OtlpSink, or a simple FileSink for debugging. A BufferedSink can wrap any sink to keep the most recent N events and drain them once the downstream is reachable again. A small builder parses the configured endpoint (URL scheme) and returns the right sink; unknown or missing endpoints yield the no-op. BaseConnector accepts an optional observability_sink; if absent, the builder decides. Metaclass wrappers emit events through the sink. Agents use the same sink instance so their lifecycle, planning latency, and tool-call events can be correlated with connector traffic. Duplex control (e.g., changing params) is handled by a separate control client/channel, not by the sink.
flowchart LR
subgraph RAI_Core
subgraph Agents
A1[MegaMindAgent]
A2[ROS2 Control Agent]
end
subgraph Connectors
C1[ROS2 Connector]
C2[HTTP Connector]
end
OBS[ObservabilitySink]
BUF[BufferedSink]
SOCK[SocketSink]
HSINK[HttpSink]
NOOP[NoOpSink]
BUILDER[Sink Builder]
CTRL[Control Client]
end
A1 --> C1
A1 --> C2
A2 --> C1
C1 -->|events| OBS
C2 -->|events| OBS
A1 -->|events| OBS
A2 -->|events| OBS
OBS --> BUF
BUF --> SOCK
BUF --> HSINK
OBS --> NOOP
BUILDER --> OBS
CTRL <-->|params/stats| A1
CTRL <-->|params/stats| A2
GUI[GUI / Vis Server]
SOCK -->|telemetry| GUI
HSINK -->|telemetry| GUI
CTRL -->|control| GUI
ROS2[(ROS 2 Network)]
HTTPDS[(HTTP Data Sources)]
C1 --- ROS2
C2 --- HTTPDS
Detailed design
The event model stays lightweight: each entry carries agent_name/connector_name, an event_type (registration, open/close for send/receive/service, error, latency), the target or source when applicable, timestamps or a computed latency, optional metadata, and optionally a trace identifier. Every event has a version tag to allow schema evolution. Reconfiguration commands do not flow through this channel.
The sink interface is intentionally small. record(event) is the main call; start/flush/stop are best-effort hooks. NoOpSink returns immediately. BufferedSink wraps another sink, keeps a bounded deque, drops the oldest on overflow, and flushes when connectivity returns; if the downstream keeps failing it can silently downgrade to no-op. SocketSink sends non-blocking messages over TCP/WS and reconnects with backoff. Future options include HttpSink for batch POSTs, OtlpSink for standardized telemetry, and FileSink for local debugging.
For connectors, the existing metaclass wrapping of send_message, receive_message, service_call/call_service, and create_service remains, but it now emits structured events with open/close markers and latency. The constructor accepts an optional observability_sink; if none is provided, a builder uses env/config to choose one. BaseConnector depends only on the interface, not on any concrete transport. Agents mirror this pattern, emitting lifecycle, planning, tool-call, and streaming events through the same sink to keep a shared timeline.
Activation is meant to be trivial: set RAI_OBS_ENDPOINT (schemes like tcp://, ws://, http://, or file://), optionally tune RAI_OBS_ENABLED and RAI_OBS_BUFFER_SIZE. If nothing is set or the endpoint is invalid, the system defaults to no-op. Failure modes are contained: when the server is absent or slow, the buffered sink queues up to N events and drops oldest under pressure; the application threads never block. If a sink crashes, we log at debug level and fall back to no-op to protect the user workload. Control (e.g., dynamic parameter updates) is exposed via the separate control channel so telemetry remains one-way and non-blocking.
GUI concept (informational)
We also intend a GUI that visualizes agents and their connectors alongside foreign data sources such as ROS 2 networks or HTTP endpoints. Connectors appear as nodes with edges to other connectors and to external sources; topic/service/action links are rendered as directional edges. Clicking an agent opens its dynamic parameters (e.g., task handling mode, chosen LLM) and live statistics (latency, success rate, per-node latencies). The GUI would consume the emitted observability events to rebuild topology, show state over time, and surface per-agent metrics, without requiring additional code paths inside RAI.
Drawbacks
This design adds some plumbing to the core, even when disabled. Metaclass wrapping plus event emission carry small overhead that should be measured. Buffered mode can still lose data under heavy backpressure. Supporting multiple sinks increases the testing surface.
Alternatives considered
Sticking with the hard-coded socket client from the POC would keep things simple but leave us tightly coupled and brittle when the server is absent. Decorating each connector/agent method instead of using a metaclass would be explicit but invasive and easy to miss in future changes. Emitting only structured logs and letting external tooling scrape them would reduce code here but make real-time topology and latency views harder and would require log shipping infrastructure. Forcing a single telemetry SDK like OTLP would standardize output but impose that stack on every user and increase installation burden.
Alternative design: VisConnector inside BaseConnector
One idea was to pass a visualization connector (e.g., vis_connector: BaseConnector or vis_connector: HTTPVisConnector) directly into BaseConnector and emit events through that instance. While attractive for reuse, it introduces problems:
- Type ambiguity and potential circular imports (a connector taking another connector of similar type).
- Tighter coupling between core connector logic and a specific visualization transport.
- No clear story for late/absent servers or buffering unless duplicated inside that connector.
Compared with the sink approach, the VisConnector pattern is more invasive and risks entangling observability with data-plane connectors. The sink interface keeps the dependency surface small, avoids circularity, and lets us plug multiple transports (including HTTP for the GUI) without changing connector constructors. For duplex needs like parameter reconfiguration, a separate control channel/client is clearer than overloading the sink or nesting another connector.
Related systems and influences
- OpenTelemetry (multi-exporter model, versioned schemas, one-way non-blocking telemetry)
- Prometheus exporters (decoupled sinks, bounded buffering, drop-oldest under pressure)
- Jaeger/Zipkin tracing (optional trace IDs for future correlation)
- LangChain tracing (per-call/tool-call timelines without backend lock-in)
- ROS 2 diagnostics/tracing (telemetry separate from parameter/control paths)
- Grafana Agent/Loki (builder + pluggable outputs including HTTP/file)
Appendix: Sequence diagrams
Sequence: send_message via metaclass and sink
sequenceDiagram
participant Agent
participant Connector
participant Wrapper as ObservabilityMeta Wrapper
participant Sink as ObservabilitySink
participant Backend as Telemetry Backend
Agent->>Connector: send_message()
Connector->>Wrapper: intercepted by metaclass wrapper
Wrapper->>Wrapper: pick handler via EVENT_HANDLERS[schema_version][method]
Wrapper->>Connector: invoke original send_message body
Connector-->>Wrapper: return
Wrapper->>Sink: record(event with schema_version, event_type=send_message, latency_ms,...)
Sink->>Backend: deliver (or buffer/drop-oldest if unavailable)
Backend-->>Sink: ack/fail
Sink-->>Wrapper: return (non-blocking to caller)
Wrapper-->>Agent: return
Sequence: agent reconfiguration via control channel
sequenceDiagram
participant GUI as GUI/Vis Server
participant Control as Control Client
participant Agent
participant Connector
GUI->>Control: apply_patch({mode: override})
Control-->>GUI: ack with updated params
Control->>Agent: params changed (pull or callback)
Agent->>Agent: refresh local config/state
Agent->>Connector: (optional) use new params in next operations
Agent-->>Control: (optional) report current params/status
Open questions and initial decisions
-
Event volume and sampling
For the first implementation, all events emit by default. The system is expected to stay very lightweight and is primarily intended for debugging and development use. No sampling or rate limiting is introduced initially. -
Time source and timestamps
Each event carries a timestamp based on time.time(). This is sufficient for the initial implementation. No shared clock abstraction or monotonic timestamps are required at this stage. -
Event ordering
Best effort ordering is acceptable. Once buffering, reconnects, or drops occur, strict ordering guarantees are not required. -
Failure visibility
Should sink failures surface beyond debug logs?
Related PR
Metadata
Metadata
Assignees
Labels
Type
Projects
Status