livekit-data-channels

star 0

Implement real-time messaging, RPC, and data streaming in LiveKit rooms

FutureAtoms By FutureAtoms schedule Updated 3/11/2026

name: livekit-data-channels description: Implement real-time messaging, RPC, and data streaming in LiveKit rooms argument-hint: "" allowed-tools: Read, Write, Bash(npm install, pip install), Glob, Grep

LiveKit Data Channels

Real-time messaging and data streaming: $ARGUMENTS

Expert Knowledge

You are a LiveKit data channels specialist with expertise in:

  • Data packet messaging
  • Text and byte streams
  • Remote Procedure Calls (RPC)
  • Reliable vs lossy delivery
  • Chat and signaling patterns

Data Channel Types

Type Use Case Reliability
Data Packets Small messages, events Configurable
Text Streams LLM responses, chat Reliable
Byte Streams File transfer, media Reliable
RPC Request/response Reliable

Data Packets

JavaScript/TypeScript

import { Room, DataPacket_Kind } from 'livekit-client';

const room = new Room();
await room.connect(url, token);

// Send to everyone in room
await room.localParticipant.publishData(
  new TextEncoder().encode(JSON.stringify({
    type: 'chat',
    message: 'Hello everyone!',
    timestamp: Date.now(),
  })),
  { reliable: true }
);

// Send to specific participants
await room.localParticipant.publishData(
  new TextEncoder().encode('Private message'),
  {
    reliable: true,
    destinationIdentities: ['user-1', 'user-2'],
  }
);

// Receive data
room.on('dataReceived', (payload, participant, kind) => {
  const message = JSON.parse(new TextDecoder().decode(payload));
  console.log(`${participant?.identity}: ${message.message}`);
});

Python

from livekit import rtc

room = rtc.Room()
await room.connect(url, token)

# Send data
message = json.dumps({"type": "chat", "text": "Hello"})
await room.local_participant.publish_data(
    payload=message.encode(),
    reliable=True,
)

# Receive data
@room.on("data_received")
def on_data(payload: bytes, participant: rtc.RemoteParticipant, kind):
    data = json.loads(payload.decode())
    print(f"{participant.identity}: {data['text']}")

Chat Implementation

import { Room, RoomEvent, DataPacket_Kind } from 'livekit-client';

interface ChatMessage {
  id: string;
  senderId: string;
  senderName: string;
  text: string;
  timestamp: number;
}

class LiveKitChat {
  private room: Room;
  private messages: ChatMessage[] = [];
  private onMessage?: (message: ChatMessage) => void;

  constructor(room: Room) {
    this.room = room;
    this.setupDataListener();
  }

  private setupDataListener() {
    this.room.on(RoomEvent.DataReceived, (payload, participant) => {
      try {
        const message = JSON.parse(new TextDecoder().decode(payload));
        if (message.type === 'chat') {
          const chatMessage: ChatMessage = {
            id: message.id,
            senderId: participant?.identity || 'unknown',
            senderName: participant?.name || 'Unknown',
            text: message.text,
            timestamp: message.timestamp,
          };
          this.messages.push(chatMessage);
          this.onMessage?.(chatMessage);
        }
      } catch (e) {
        console.error('Failed to parse message:', e);
      }
    });
  }

  async sendMessage(text: string): Promise<void> {
    const message = {
      type: 'chat',
      id: crypto.randomUUID(),
      text,
      timestamp: Date.now(),
    };

    await this.room.localParticipant.publishData(
      new TextEncoder().encode(JSON.stringify(message)),
      { reliable: true }
    );
  }

  setOnMessage(callback: (message: ChatMessage) => void) {
    this.onMessage = callback;
  }

  getMessages(): ChatMessage[] {
    return [...this.messages];
  }
}

// Usage
const chat = new LiveKitChat(room);
chat.setOnMessage((msg) => console.log(`${msg.senderName}: ${msg.text}`));
await chat.sendMessage('Hello!');

Text Streams

For larger text data like LLM responses:

import { Room, TextStreamWriter, TextStreamReader } from 'livekit-client';

// Send text stream
const writer = await room.localParticipant.streamText('llm-response');
for await (const chunk of llmStream) {
  await writer.write(chunk);
}
await writer.close();

// Receive text stream
room.on('textStreamReceived', async (reader: TextStreamReader, participant) => {
  const streamId = reader.info.id;
  console.log(`Receiving stream from ${participant.identity}`);

  for await (const chunk of reader) {
    console.log('Chunk:', chunk);
    // Append to UI
  }

  console.log('Stream complete');
});

Byte Streams

For file transfer and binary data:

// Send file
async function sendFile(room: Room, file: File): Promise<void> {
  const writer = await room.localParticipant.streamBytes(file.name);

  const reader = file.stream().getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    await writer.write(value);
  }

  await writer.close();
}

// Receive file
room.on('byteStreamReceived', async (reader, participant) => {
  const chunks: Uint8Array[] = [];

  for await (const chunk of reader) {
    chunks.push(chunk);
  }

  const blob = new Blob(chunks);
  const url = URL.createObjectURL(blob);
  console.log(`File received: ${reader.info.id}, URL: ${url}`);
});

Remote Procedure Calls (RPC)

Register Methods

// Server/Agent side
room.localParticipant.registerRpcMethod(
  'getWeather',
  async (data) => {
    const { city } = JSON.parse(data.payload);
    const weather = await fetchWeather(city);
    return JSON.stringify(weather);
  }
);

room.localParticipant.registerRpcMethod(
  'getCurrentTime',
  async () => {
    return new Date().toISOString();
  }
);

Call Methods

// Client side
const result = await room.localParticipant.performRpc({
  destinationIdentity: 'agent',
  method: 'getWeather',
  payload: JSON.stringify({ city: 'New York' }),
  responseTimeout: 5000,
});

console.log('Weather:', JSON.parse(result.payload));

Error Handling

room.localParticipant.registerRpcMethod(
  'riskyOperation',
  async (data) => {
    try {
      const result = await doSomethingRisky(data.payload);
      return JSON.stringify({ success: true, result });
    } catch (error) {
      throw new RpcError(500, 'Operation failed');
    }
  }
);

// Client error handling
try {
  const result = await room.localParticipant.performRpc({
    destinationIdentity: 'agent',
    method: 'riskyOperation',
    payload: 'data',
  });
} catch (error) {
  if (error instanceof RpcError) {
    console.error(`RPC Error ${error.code}: ${error.message}`);
  }
}

React Native Implementation

import { useRoom, useDataChannel } from '@livekit/react-native';

function ChatComponent() {
  const room = useRoom();
  const [messages, setMessages] = useState<ChatMessage[]>([]);

  // Listen for messages
  useEffect(() => {
    if (!room) return;

    const handleData = (payload: Uint8Array, participant: RemoteParticipant) => {
      const message = JSON.parse(new TextDecoder().decode(payload));
      if (message.type === 'chat') {
        setMessages((prev) => [...prev, {
          id: message.id,
          sender: participant.identity,
          text: message.text,
        }]);
      }
    };

    room.on('dataReceived', handleData);
    return () => {
      room.off('dataReceived', handleData);
    };
  }, [room]);

  // Send message
  const sendMessage = async (text: string) => {
    await room?.localParticipant.publishData(
      new TextEncoder().encode(JSON.stringify({
        type: 'chat',
        id: Date.now().toString(),
        text,
      })),
      { reliable: true }
    );
  };

  return (
    <View style={styles.container}>
      <FlatList
        data={messages}
        keyExtractor={(item) => item.id}
        renderItem={({ item }) => (
          <Text>{item.sender}: {item.text}</Text>
        )}
      />
      <TextInput
        onSubmitEditing={(e) => sendMessage(e.nativeEvent.text)}
        placeholder="Type a message..."
      />
    </View>
  );
}

Reliable vs Lossy

// Reliable (default) - guaranteed delivery, ordered
await room.localParticipant.publishData(
  payload,
  { reliable: true }  // Uses SCTP
);

// Lossy - faster, no guarantee
await room.localParticipant.publishData(
  payload,
  { reliable: false }  // Uses UDP-like
);

// Use lossy for:
// - Cursor positions
// - Game state updates
// - Real-time indicators
// - Anything where latest value matters more than all values

Custom Topics

// Use topics to organize messages
interface TopicMessage {
  topic: string;
  payload: any;
}

const topics = {
  CHAT: 'chat',
  REACTIONS: 'reactions',
  CURSOR: 'cursor',
  WHITEBOARD: 'whiteboard',
};

// Send with topic
await room.localParticipant.publishData(
  new TextEncoder().encode(JSON.stringify({
    topic: topics.REACTIONS,
    payload: { emoji: '👍', targetMessageId: 'msg-123' },
  })),
  { reliable: true }
);

// Handle by topic
room.on('dataReceived', (payload, participant) => {
  const message: TopicMessage = JSON.parse(new TextDecoder().decode(payload));

  switch (message.topic) {
    case topics.CHAT:
      handleChatMessage(message.payload, participant);
      break;
    case topics.REACTIONS:
      handleReaction(message.payload);
      break;
    case topics.CURSOR:
      updateCursorPosition(participant.identity, message.payload);
      break;
  }
});

Complete Example

import { Room, RoomEvent, RemoteParticipant } from 'livekit-client';

interface Message {
  id: string;
  type: 'chat' | 'reaction' | 'system';
  senderId: string;
  senderName: string;
  content: any;
  timestamp: number;
}

class DataChannelManager {
  private room: Room;
  private handlers: Map<string, (msg: Message) => void> = new Map();

  constructor(room: Room) {
    this.room = room;
    this.setupListeners();
  }

  private setupListeners() {
    this.room.on(RoomEvent.DataReceived, (payload, participant, kind) => {
      try {
        const message: Message = JSON.parse(new TextDecoder().decode(payload));
        message.senderId = participant?.identity || 'system';
        message.senderName = participant?.name || 'System';

        const handler = this.handlers.get(message.type);
        if (handler) {
          handler(message);
        }
      } catch (e) {
        console.error('Data parse error:', e);
      }
    });

    // RPC methods
    this.room.localParticipant.registerRpcMethod('ping', async () => {
      return JSON.stringify({ pong: Date.now() });
    });
  }

  on(type: string, handler: (msg: Message) => void) {
    this.handlers.set(type, handler);
  }

  async broadcast(type: string, content: any, reliable = true): Promise<void> {
    const message: Message = {
      id: crypto.randomUUID(),
      type: type as any,
      senderId: this.room.localParticipant.identity,
      senderName: this.room.localParticipant.name || '',
      content,
      timestamp: Date.now(),
    };

    await this.room.localParticipant.publishData(
      new TextEncoder().encode(JSON.stringify(message)),
      { reliable }
    );
  }

  async sendTo(
    identities: string[],
    type: string,
    content: any
  ): Promise<void> {
    const message: Message = {
      id: crypto.randomUUID(),
      type: type as any,
      senderId: this.room.localParticipant.identity,
      senderName: this.room.localParticipant.name || '',
      content,
      timestamp: Date.now(),
    };

    await this.room.localParticipant.publishData(
      new TextEncoder().encode(JSON.stringify(message)),
      { reliable: true, destinationIdentities: identities }
    );
  }

  async callRpc(
    identity: string,
    method: string,
    payload: any
  ): Promise<any> {
    const result = await this.room.localParticipant.performRpc({
      destinationIdentity: identity,
      method,
      payload: JSON.stringify(payload),
    });
    return JSON.parse(result.payload);
  }
}

// Usage
const dataChannel = new DataChannelManager(room);

dataChannel.on('chat', (msg) => {
  console.log(`Chat from ${msg.senderName}: ${msg.content}`);
});

dataChannel.on('reaction', (msg) => {
  console.log(`Reaction: ${msg.content.emoji}`);
});

await dataChannel.broadcast('chat', 'Hello everyone!');
await dataChannel.sendTo(['user-1'], 'chat', 'Private message');

Best Practices

  1. Use topics: Organize message types
  2. Reliable for important data: Chat, commands
  3. Lossy for real-time: Cursors, positions
  4. Handle parse errors: Validate incoming data
  5. RPC for request/response: Better than manual correlation

Deliverables

For: $ARGUMENTS

Provide:

  1. Data packet configuration
  2. Message type definitions
  3. Send/receive handlers
  4. RPC methods if needed
  5. Stream handling if needed
  6. Error handling
Install via CLI
npx skills add https://github.com/FutureAtoms/claude-skills-backup --skill livekit-data-channels
Repository Details
star Stars 0
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator