name: new-broker description: Complete step-by-step guide for implementing a new protocol Broker in RobustMQ. Use when the user asks to add a new broker, implement a new protocol, or scaffold a new broker crate.
new-broker
Complete steps for implementing a brand-new protocol Broker in RobustMQ.
Architecture
Each Broker's responsibility is: protocol parsing + orchestration of shared infrastructure components. It does NOT implement storage, routing, or cluster communication itself.
Protocol packet → broker handler → shared components (storage-adapter / broker-core / node-call / rate-limit)
Implementation Steps
Step 1: Protocol layer — src/protocol/src/<proto>/
packet.rs— define all protocol packet data structures (refer to NATSServerInfo/ClientConnect/NatsPacket)codec.rs— implementtokio_util::codec::Decoder+Encoder<Packet>, requires#[derive(Clone)]mod.rs—pub mod codec; pub mod packet;- Add
pub mod <proto>;tosrc/protocol/src/lib.rs
Codec notes:
#[derive(Clone)]required on the codec struct and all internal enums- When enum variant sizes differ greatly, wrap large variants in
Box<>to avoidclippy::large_enum_variant
Step 2: Protocol registration — src/protocol/src/robust.rs
Add one variant in each location:
| Location | What to add |
|---|---|
RobustMQProtocol enum |
PROTO variant; to_u8() assigns a unique u8; to_str() returns the name; from_u8() adds a branch; add is_proto() method |
XxxWrapperExtend struct |
Create pub struct ProtoWrapperExtend {} |
RobustMQWrapperExtend enum |
PROTO(ProtoWrapperExtend) variant; add branch in to_mqtt_protocol() |
RobustMQPacket enum |
PROTO(ProtoPacket) variant; add get_proto_packet() method |
Step 3: Codec registration — src/protocol/src/codec.rs
- Add
PROTO(ProtoPacket)variant toRobustMQCodecWrapperenum; add branch inDisplayimpl - Add
PROTO(ProtoCodec)variant toRobustMQCodecEnumenum - Add
RobustMQProtocol::PROTOtoPROTOCOL_PROBE_ORDERconstant array - Add
proto_codec: ProtoCodecfield toRobustMQCodecstruct; initialize innew() - Add
Some(RobustMQProtocol::PROTO)arm indecode_data()match onself.protocol - Add
RobustMQCodecWrapper::PROTO(pkt)arm inencode_data()match
Step 4: Fix exhaustive matches in network-server
The following files have exhaustive matches — add a PROTO branch in each (search for RobustMQPacket::StorageEngine to locate them):
src/common/network-server/src/common/handler.rs— two places:write_response()andwrite_websocket_response()src/common/network-server/src/common/write.rs— two places:write_tcp_frame()andwrite_quic_frame()src/common/network-server/src/common/tcp_acceptor.rssrc/common/network-server/src/common/tls_acceptor.rssrc/common/network-server/src/quic/acceptor.rssrc/common/network-server/src/websocket/server.rs
Step 5: New broker crate — src/<proto>-broker/
src/<proto>-broker/
├── Cargo.toml
└── src/
├── lib.rs — pub mod broker/handler/nats/server
├── broker.rs — XxxBrokerServerParams + XxxBrokerServer (DEFAULT_PORT)
├── server/mod.rs — XxxServer, TcpServer + handler_process
├── handler/
│ ├── mod.rs
│ └── command.rs — XxxHandlerCommand impl Command, match dispatches to <proto>/ functions
└── <proto>/
├── mod.rs
├── connect.rs — process_connect()
├── publish.rs — process_pub()
├── subscribe.rs — process_sub() / process_unsub()
└── ping.rs — process_ping() / process_pong() (if the protocol has heartbeats)
<proto>/ directory design principles:
- One file per semantically related group of commands (not one file per command)
- Function signatures accept protocol packet fields, return
Option<ProtoPacket> - Function bodies contain only TODO comments describing which shared components to call; business logic filled in later
command.rs pattern:
let resp_packet = match &packet {
ProtoPacket::Connect(req) => connect::process_connect(req),
ProtoPacket::Pub { subject, payload, .. } => publish::process_pub(subject, payload),
// ...
}?;
Some(ResponsePackage::new(connection_id, RobustMQPacket::PROTO(resp_packet)))
Step 6: Configuration — src/common/config/src/
config.rs— createpub struct ProtoRuntime { pub network: Network }withDefault impl; add field toBrokerConfig; initialize inBrokerConfig::default()default.rs— addpub fn default_proto_runtime() -> ProtoRuntime; addProtoRuntimeto use imports
Step 7: Workspace registration
- Root
Cargo.toml— add"src/<proto>-broker"to[workspace] members; add<proto>-broker = { path = "src/<proto>-broker" }to[workspace.dependencies] src/broker-server/Cargo.toml— add<proto>-broker.workspace = true
Step 8: broker-server integration — src/broker-server/src/lib.rs
- Add
use <proto>_broker::broker::{XxxBrokerServer, XxxBrokerServerParams}; - Add
proto_params: XxxBrokerServerParamsfield toBrokerServerstruct - Build
proto_paramsinnew()(refer to kafka_params / amqp_params) - Add
self.start_proto_broker(app_stop.clone());instart()Phase 7 - Implement
start_proto_broker()method (refer tostart_kafka_broker)
Step 9: Verify
cargo build --workspace
Successful compilation confirms the scaffold is complete. Fill in business logic in the <proto>/ directory functions afterward.
Common Pitfalls
| Problem | Cause | Fix |
|---|---|---|
Clone compile error |
Internal enum in codec struct missing #[derive(Clone)] |
Add Clone to all enums inside the codec |
large_enum_variant clippy error |
Enum variant size difference exceeds threshold | Wrap large variants in Box<> |
non-exhaustive patterns |
match in network-server missing new branch | Search StorageEngine to locate all matches, add branch to each |
unresolved module metadata_struct |
Missing Cargo.toml dependency | Add metadata-struct.workspace = true |
| PR template CLA link broken | Relative path doesn't render correctly on GitHub | Change to https://github.com/robustmq/robustmq/blob/main/CLA.md |