name: vda5050_integration description: Bridge a VDA 5050 v3.0.0 fleet-control interface (MQTT/JSON) onto Nav 2 under Clean Architecture — domain entities, MQTT/Nav 2 adapters behind ports, order→NavigateThroughPoses mapping, state aggregation, action handlers. Trigger when the user asks to build or extend a VDA 5050 connector / fleet bridge.
VDA 5050 ↔ ROS 2 / Nav 2 Integration
How to bridge a VDA 5050 v3.0.0 fleet-control interface (MQTT/JSON) onto a Nav 2 stack without breaking Clean Architecture.
- Protocol overview:
rules/vda5050_protocol.md. - Complete message/field spec + processes:
rules/vda5050_messages.md. - Code-generation format analysis (3 proven idioms — pydantic /
ROS
.msg+bridge / C++ structs):rules/vda5050_implementation_formats.md. Real reference repos in~/nav2_ws/src/:isaac_mission_dispatch(fleet/pydantic),isaac_ros_cloud_control(robot/ROS 2),vda5050_core(C++ core). - Authoritative spec + JSON schemas:
~/nav2_ws/src/VDA5050/(VDA5050_EN.md,json_schemas/*.schema). - Reference implementation (working, Clean-Architecture):
~/nav2_ws/src/vda5050_connector/. Study it before writing a new one.
The one rule that drives the design
VDA 5050 is an outer-world protocol — MQTT topics, JSON payloads,
schema versions. By the layer rules in rules/clean_architecture.md:
The protocol does not belong in the domain. Domain entities model orders, nodes, edges, actions, state as plain Python/C++ — no MQTT, no
json, no*_msgs. MQTT and Nav 2 are infrastructure adapters behind domain ports.
PRESENTATION launch entrypoint, CLI, RViz
│
APPLICATION OrderHandler · StateAssembler · InstantActionHandler
│ (graph traversal, action queue, state aggregation)
DOMAIN entities: Order/Node/Edge/Action/State/Connection/Factsheet
▲ ports: MqttClientInterface · NavigationAdapterInterface
│ implements
INFRASTRUCTURE MqttBridge (paho) · Nav2Adapter (rclpy + NavigateToPose)
The connector mirrors this exactly:
vda5050_connector/
├── domain/
│ ├── entities/ order.py state.py action.py connection.py
│ │ factsheet.py visualization.py header.py graph.py
│ └── interfaces/ mqtt_client.py navigation_adapter.py ← PORTS
├── application/ order_handler.py state_assembler.py
│ action_handler.py
└── infrastructure/ mqtt_bridge.py nav2_adapter.py
fleet_adapter.py vda5050_node.py ← composition root
Layer responsibilities
Domain — entities + ports (no ROS, no MQTT, no JSON)
Model each message as a dataclass tree. Keep field names aligned with
the JSON schema so (de)serialization is mechanical, but do not put
json.loads here — that is an adapter concern.
# domain/entities/order.py (excerpt of the real connector)
@dataclass
class NodePosition:
x: float = 0.0
y: float = 0.0
theta: float = 0.0
map_id: str = ''
@dataclass
class OrderNode:
node_id: str = ''
sequence_id: int = 0
released: bool = False
node_position: Optional[NodePosition] = None
actions: List[Action] = field(default_factory=list)
@dataclass
class Order:
order_id: str = ''
order_update_id: int = 0
nodes: List[OrderNode] = field(default_factory=list)
edges: List[OrderEdge] = field(default_factory=list)
Ports (abstract) live in domain/interfaces/:
class NavigationAdapterInterface(ABC):
@abstractmethod
def navigate_to(self, node: OrderNode, on_done) -> None: ...
@abstractmethod
def cancel(self) -> None: ...
@abstractmethod
def get_position(self) -> AGVPosition: ...
class MqttClientInterface(ABC):
@abstractmethod
def publish(self, topic, payload, qos=0, retain=False) -> None: ...
@abstractmethod
def subscribe(self, topic, callback, qos=0) -> None: ...
@abstractmethod
def set_last_will(self, topic, payload, qos=1, retain=True) -> None: ...
Application — protocol logic, depends only on domain
OrderHandler— validates an incomingOrder, enforces base/horizon, runs the node/edge graph insequenceIdorder, resolves missingnodePositionfrom aNavigationGraph(VDA 5050 allows omitting position when both MC and robot know the node), tracksnodeStates/edgeStates, and callsNavigationAdapterInterface. Handles order updates / stitching (sameorderId, nextorderUpdateId, first node =lastNodeId).StateAssembler— aggregates position, velocity, battery, errors,actionStates,nodeStates/edgeStatesinto aStateentity for periodic publish.InstantActionHandler—cancelOrder,startPause/stopPause,stateRequest,factsheetRequest,pick/drop, etc.; drives theactionStatusstate machine (WAITING → … → FINISHED/FAILED/RETRIABLE).
Infrastructure — adapters implementing the ports
MqttBridge(paho-mqtt) — topicvda5050/v3/<mfr>/<serial>/<topic>, QoS 0 for everything exceptconnection(QoS 1), MQTT last will onconnection=CONNECTION_BROKEN. (De)serializes JSON ↔ domain entities here, and validates againstjson_schemas/*.schema.Nav2Adapter(rclpy) — mapsnavigate_to(node)→NavigateToPoseaction; reads telemetry from running topics (/amcl_pose→ position,/odom→ velocity,/battery_state,/scan→ safety,/diagnostics→ errors/info).vda5050_node.py— composition root: instantiates adapters, injects ports into the application services, runs the publish timers.
VDA 5050 → Nav 2 mapping cheat-sheet
| VDA 5050 | Nav 2 / ROS 2 |
|---|---|
order node (released base) |
NavigateToPose goal (per node) or NavigateThroughPoses (whole base) |
nodePosition {x,y,theta,mapId} |
geometry_msgs/PoseStamped in map frame |
edge.maximumSpeed |
controller setSpeedLimit / velocity-smoother cap |
edge.trajectory (NURBS) |
sample → nav_msgs/Path (or let the planner replan) |
cancelOrder instant action |
cancel the Nav 2 action goal |
startPause/stopPause |
cancel/resume goal or velocity smoother gate |
pick/drop/finePositioning |
custom behavior / action server (your hardware) |
state.mobileRobotPosition |
/amcl_pose (PoseWithCovarianceStamped) |
state.velocity |
/odom (nav_msgs/Odometry) |
state.powerSupply |
/battery_state (sensor_msgs/BatteryState) |
state.errors / information |
/diagnostics (DiagnosticArray) |
state.driving / paused |
derived from goal status / pause flag |
state.safetyState |
E-stop topic + /scan proximity |
initializePosition instant action |
publish /initialpose to AMCL |
QoS alignment
VDA 5050 prescribes MQTT QoS, which maps onto ROS 2 QoS at the bridge
edge (see rules/ros2_communication.md):
| Data | MQTT QoS | ROS 2 QoS |
|---|---|---|
order / instantActions (commands) |
0 | RELIABLE, depth 10 |
state / visualization (telemetry) |
0 | BEST_EFFORT, depth 5 |
connection |
1 | RELIABLE, TRANSIENT_LOCAL |
Checklist for a new connector
- Define domain entities mirroring the 8 message schemas — no MQTT
/ JSON /
*_msgsimports. - Define
MqttClientInterface+NavigationAdapterInterfaceports. - Put graph traversal, order stitching, action queue and the
actionStatusstate machine in application services. - Implement adapters in infrastructure; validate every inbound and
outbound JSON against
json_schemas/*.schema. - Set the
connectionlast will toCONNECTION_BROKEN; publishONLINEon connect. - Publish
stateon a timer (default1 Hz) and on every event (node reached, action status change);10 Hz).visualizationfaster ( - Send
factsheeton connect and onfactsheetRequest. - Test: unit-test entities + handlers without ROS; integration-test the
adapters with
launch_testing+ a local MQTT broker (seerules/testing.md).
Common pitfalls
- Domain importing
paho,json, orrclpy→ it's not domain. Move it to an adapter behind a port. - Mutating the base — base nodes/edges (
released=true) are committed; only extend the horizon or send an order update. - Order update first node ≠
lastNodeId→ must be rejected with anerrorcarrying theorderId/orderUpdateId. - Forgetting
headerIdper-topic increment or non-UTC timestamps. - Reporting
loads: []when the load state is unknown — omit the field entirely instead (empty array means confirmed unloaded). - Schema drift — always re-validate against the v3.0.0 schemas in
~/nav2_ws/src/VDA5050/json_schemas/; the VDA PDF is authoritative if they ever differ.