name: livekit-data-channels description: Implement real-time messaging, RPC, and data streaming in LiveKit rooms argument-hint: "" allowed-tools: Read, Write, Bash(npm install, pip install), Glob, Grep
LiveKit Data Channels
Real-time messaging and data streaming: $ARGUMENTS
Expert Knowledge
You are a LiveKit data channels specialist with expertise in:
- Data packet messaging
- Text and byte streams
- Remote Procedure Calls (RPC)
- Reliable vs lossy delivery
- Chat and signaling patterns
Data Channel Types
| Type | Use Case | Reliability |
|---|---|---|
| Data Packets | Small messages, events | Configurable |
| Text Streams | LLM responses, chat | Reliable |
| Byte Streams | File transfer, media | Reliable |
| RPC | Request/response | Reliable |
Data Packets
JavaScript/TypeScript
import { Room, DataPacket_Kind } from 'livekit-client';
const room = new Room();
await room.connect(url, token);
// Send to everyone in room
await room.localParticipant.publishData(
new TextEncoder().encode(JSON.stringify({
type: 'chat',
message: 'Hello everyone!',
timestamp: Date.now(),
})),
{ reliable: true }
);
// Send to specific participants
await room.localParticipant.publishData(
new TextEncoder().encode('Private message'),
{
reliable: true,
destinationIdentities: ['user-1', 'user-2'],
}
);
// Receive data
room.on('dataReceived', (payload, participant, kind) => {
const message = JSON.parse(new TextDecoder().decode(payload));
console.log(`${participant?.identity}: ${message.message}`);
});
Python
from livekit import rtc
room = rtc.Room()
await room.connect(url, token)
# Send data
message = json.dumps({"type": "chat", "text": "Hello"})
await room.local_participant.publish_data(
payload=message.encode(),
reliable=True,
)
# Receive data
@room.on("data_received")
def on_data(payload: bytes, participant: rtc.RemoteParticipant, kind):
data = json.loads(payload.decode())
print(f"{participant.identity}: {data['text']}")
Chat Implementation
import { Room, RoomEvent, DataPacket_Kind } from 'livekit-client';
interface ChatMessage {
id: string;
senderId: string;
senderName: string;
text: string;
timestamp: number;
}
class LiveKitChat {
private room: Room;
private messages: ChatMessage[] = [];
private onMessage?: (message: ChatMessage) => void;
constructor(room: Room) {
this.room = room;
this.setupDataListener();
}
private setupDataListener() {
this.room.on(RoomEvent.DataReceived, (payload, participant) => {
try {
const message = JSON.parse(new TextDecoder().decode(payload));
if (message.type === 'chat') {
const chatMessage: ChatMessage = {
id: message.id,
senderId: participant?.identity || 'unknown',
senderName: participant?.name || 'Unknown',
text: message.text,
timestamp: message.timestamp,
};
this.messages.push(chatMessage);
this.onMessage?.(chatMessage);
}
} catch (e) {
console.error('Failed to parse message:', e);
}
});
}
async sendMessage(text: string): Promise<void> {
const message = {
type: 'chat',
id: crypto.randomUUID(),
text,
timestamp: Date.now(),
};
await this.room.localParticipant.publishData(
new TextEncoder().encode(JSON.stringify(message)),
{ reliable: true }
);
}
setOnMessage(callback: (message: ChatMessage) => void) {
this.onMessage = callback;
}
getMessages(): ChatMessage[] {
return [...this.messages];
}
}
// Usage
const chat = new LiveKitChat(room);
chat.setOnMessage((msg) => console.log(`${msg.senderName}: ${msg.text}`));
await chat.sendMessage('Hello!');
Text Streams
For larger text data like LLM responses:
import { Room, TextStreamWriter, TextStreamReader } from 'livekit-client';
// Send text stream
const writer = await room.localParticipant.streamText('llm-response');
for await (const chunk of llmStream) {
await writer.write(chunk);
}
await writer.close();
// Receive text stream
room.on('textStreamReceived', async (reader: TextStreamReader, participant) => {
const streamId = reader.info.id;
console.log(`Receiving stream from ${participant.identity}`);
for await (const chunk of reader) {
console.log('Chunk:', chunk);
// Append to UI
}
console.log('Stream complete');
});
Byte Streams
For file transfer and binary data:
// Send file
async function sendFile(room: Room, file: File): Promise<void> {
const writer = await room.localParticipant.streamBytes(file.name);
const reader = file.stream().getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
await writer.write(value);
}
await writer.close();
}
// Receive file
room.on('byteStreamReceived', async (reader, participant) => {
const chunks: Uint8Array[] = [];
for await (const chunk of reader) {
chunks.push(chunk);
}
const blob = new Blob(chunks);
const url = URL.createObjectURL(blob);
console.log(`File received: ${reader.info.id}, URL: ${url}`);
});
Remote Procedure Calls (RPC)
Register Methods
// Server/Agent side
room.localParticipant.registerRpcMethod(
'getWeather',
async (data) => {
const { city } = JSON.parse(data.payload);
const weather = await fetchWeather(city);
return JSON.stringify(weather);
}
);
room.localParticipant.registerRpcMethod(
'getCurrentTime',
async () => {
return new Date().toISOString();
}
);
Call Methods
// Client side
const result = await room.localParticipant.performRpc({
destinationIdentity: 'agent',
method: 'getWeather',
payload: JSON.stringify({ city: 'New York' }),
responseTimeout: 5000,
});
console.log('Weather:', JSON.parse(result.payload));
Error Handling
room.localParticipant.registerRpcMethod(
'riskyOperation',
async (data) => {
try {
const result = await doSomethingRisky(data.payload);
return JSON.stringify({ success: true, result });
} catch (error) {
throw new RpcError(500, 'Operation failed');
}
}
);
// Client error handling
try {
const result = await room.localParticipant.performRpc({
destinationIdentity: 'agent',
method: 'riskyOperation',
payload: 'data',
});
} catch (error) {
if (error instanceof RpcError) {
console.error(`RPC Error ${error.code}: ${error.message}`);
}
}
React Native Implementation
import { useRoom, useDataChannel } from '@livekit/react-native';
function ChatComponent() {
const room = useRoom();
const [messages, setMessages] = useState<ChatMessage[]>([]);
// Listen for messages
useEffect(() => {
if (!room) return;
const handleData = (payload: Uint8Array, participant: RemoteParticipant) => {
const message = JSON.parse(new TextDecoder().decode(payload));
if (message.type === 'chat') {
setMessages((prev) => [...prev, {
id: message.id,
sender: participant.identity,
text: message.text,
}]);
}
};
room.on('dataReceived', handleData);
return () => {
room.off('dataReceived', handleData);
};
}, [room]);
// Send message
const sendMessage = async (text: string) => {
await room?.localParticipant.publishData(
new TextEncoder().encode(JSON.stringify({
type: 'chat',
id: Date.now().toString(),
text,
})),
{ reliable: true }
);
};
return (
<View style={styles.container}>
<FlatList
data={messages}
keyExtractor={(item) => item.id}
renderItem={({ item }) => (
<Text>{item.sender}: {item.text}</Text>
)}
/>
<TextInput
onSubmitEditing={(e) => sendMessage(e.nativeEvent.text)}
placeholder="Type a message..."
/>
</View>
);
}
Reliable vs Lossy
// Reliable (default) - guaranteed delivery, ordered
await room.localParticipant.publishData(
payload,
{ reliable: true } // Uses SCTP
);
// Lossy - faster, no guarantee
await room.localParticipant.publishData(
payload,
{ reliable: false } // Uses UDP-like
);
// Use lossy for:
// - Cursor positions
// - Game state updates
// - Real-time indicators
// - Anything where latest value matters more than all values
Custom Topics
// Use topics to organize messages
interface TopicMessage {
topic: string;
payload: any;
}
const topics = {
CHAT: 'chat',
REACTIONS: 'reactions',
CURSOR: 'cursor',
WHITEBOARD: 'whiteboard',
};
// Send with topic
await room.localParticipant.publishData(
new TextEncoder().encode(JSON.stringify({
topic: topics.REACTIONS,
payload: { emoji: '👍', targetMessageId: 'msg-123' },
})),
{ reliable: true }
);
// Handle by topic
room.on('dataReceived', (payload, participant) => {
const message: TopicMessage = JSON.parse(new TextDecoder().decode(payload));
switch (message.topic) {
case topics.CHAT:
handleChatMessage(message.payload, participant);
break;
case topics.REACTIONS:
handleReaction(message.payload);
break;
case topics.CURSOR:
updateCursorPosition(participant.identity, message.payload);
break;
}
});
Complete Example
import { Room, RoomEvent, RemoteParticipant } from 'livekit-client';
interface Message {
id: string;
type: 'chat' | 'reaction' | 'system';
senderId: string;
senderName: string;
content: any;
timestamp: number;
}
class DataChannelManager {
private room: Room;
private handlers: Map<string, (msg: Message) => void> = new Map();
constructor(room: Room) {
this.room = room;
this.setupListeners();
}
private setupListeners() {
this.room.on(RoomEvent.DataReceived, (payload, participant, kind) => {
try {
const message: Message = JSON.parse(new TextDecoder().decode(payload));
message.senderId = participant?.identity || 'system';
message.senderName = participant?.name || 'System';
const handler = this.handlers.get(message.type);
if (handler) {
handler(message);
}
} catch (e) {
console.error('Data parse error:', e);
}
});
// RPC methods
this.room.localParticipant.registerRpcMethod('ping', async () => {
return JSON.stringify({ pong: Date.now() });
});
}
on(type: string, handler: (msg: Message) => void) {
this.handlers.set(type, handler);
}
async broadcast(type: string, content: any, reliable = true): Promise<void> {
const message: Message = {
id: crypto.randomUUID(),
type: type as any,
senderId: this.room.localParticipant.identity,
senderName: this.room.localParticipant.name || '',
content,
timestamp: Date.now(),
};
await this.room.localParticipant.publishData(
new TextEncoder().encode(JSON.stringify(message)),
{ reliable }
);
}
async sendTo(
identities: string[],
type: string,
content: any
): Promise<void> {
const message: Message = {
id: crypto.randomUUID(),
type: type as any,
senderId: this.room.localParticipant.identity,
senderName: this.room.localParticipant.name || '',
content,
timestamp: Date.now(),
};
await this.room.localParticipant.publishData(
new TextEncoder().encode(JSON.stringify(message)),
{ reliable: true, destinationIdentities: identities }
);
}
async callRpc(
identity: string,
method: string,
payload: any
): Promise<any> {
const result = await this.room.localParticipant.performRpc({
destinationIdentity: identity,
method,
payload: JSON.stringify(payload),
});
return JSON.parse(result.payload);
}
}
// Usage
const dataChannel = new DataChannelManager(room);
dataChannel.on('chat', (msg) => {
console.log(`Chat from ${msg.senderName}: ${msg.content}`);
});
dataChannel.on('reaction', (msg) => {
console.log(`Reaction: ${msg.content.emoji}`);
});
await dataChannel.broadcast('chat', 'Hello everyone!');
await dataChannel.sendTo(['user-1'], 'chat', 'Private message');
Best Practices
- Use topics: Organize message types
- Reliable for important data: Chat, commands
- Lossy for real-time: Cursors, positions
- Handle parse errors: Validate incoming data
- RPC for request/response: Better than manual correlation
Deliverables
For: $ARGUMENTS
Provide:
- Data packet configuration
- Message type definitions
- Send/receive handlers
- RPC methods if needed
- Stream handling if needed
- Error handling