name: add-telegram description: Add Telegram as a messaging channel to the Corsair agent. Use when the user wants to chat with their agent via a Telegram bot instead of or alongside WhatsApp.
Corsair Telegram Setup
Run all steps automatically. Pause only when the user must take a manual action (creating a bot, providing a token). Always run commands directly — never tell the user to run something you can do yourself.
The agent runs in Docker — use docker compose exec agent <command> for commands that need to run inside the container. Exception: docker compose commands themselves run on the host.
1. Verify the agent container is running
docker compose ps
The agent service should be running. If not:
docker compose up -d
2. Create the Telegram bot (BotFather)
grammyships pre-installed in Corsair's Docker image — no package installation step needed.
If the user doesn't already have a bot token, tell them:
I need you to create a Telegram bot — it only takes 30 seconds:
- Open Telegram and search for
@BotFather- Send
/newbot- Choose a display name (e.g. "My Assistant")
- Choose a username — must end in
bot(e.g.my_assistant_bot)- Copy the token it gives you (looks like
123456789:ABC-DEF1234ghIkl-zyx57W2v1u123ew11)Paste the token here when you have it.
Wait for the token before continuing.
4. Configure environment variables
Read .env (in the project root). Add these entries if not present:
# Required: enable the Telegram listener
TELEGRAM_ENABLED=true
# Bot token from BotFather
TELEGRAM_BOT_TOKEN=<their-token>
# Optional: bot username used as trigger in group chats (default: corsair)
BOT_NAME=corsair
Follow the Credentials convention: never ask for the token in chat. Instead, tell the user to run:
echo 'TELEGRAM_BOT_TOKEN=YOUR_TOKEN_HERE' >> /path/to/project/.env
Then update TELEGRAM_ENABLED=true yourself by editing .env.
5. Update the database schema
Open server/db/schema.ts. Make two changes:
5a. Add 'telegram' to the threads source enum
Find the threads table definition. The source column currently lists ['web', 'whatsapp']. Add 'telegram':
source: text('source', { enum: ['web', 'whatsapp', 'telegram'] })
.notNull()
.default('web'),
5b. Add Telegram tables at the end of the file
// ── Telegram tables ────────────────────────────────────────────────────────────
export const telegramMessages = pgTable('telegram_messages', {
id: uuid('id').primaryKey().defaultRandom(),
/** Numeric Telegram chat ID */
chatId: text('chat_id').notNull(),
/** Numeric sender ID */
senderId: text('sender_id').notNull(),
/** Display name of the sender */
senderName: text('sender_name'),
content: text('content').notNull(),
/** When Telegram says the message was sent */
sentAt: timestamp('sent_at').notNull(),
/** True if this is a group or supergroup chat */
isGroup: boolean('is_group').notNull().default(false),
/** False = not yet handled by the agent poller */
processed: boolean('processed').notNull().default(false),
createdAt: timestamp('created_at').notNull().defaultNow(),
});
export const telegramChats = pgTable('telegram_chats', {
chatId: text('chat_id').primaryKey(),
name: text('name'),
type: text('type', { enum: ['dm', 'group'] }).notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
});
6. Create the Telegram channel files
Create a new directory server/telegram/ with three files, mirroring the WhatsApp pattern.
6a. server/telegram/connection.ts
import { Bot } from 'grammy';
export interface InboundTelegramMessage {
chatId: number;
senderId: number;
senderName: string | null;
content: string;
isGroup: boolean;
sentAt: Date;
}
export class TelegramConnection {
private bot: Bot;
constructor(
token: string,
private onMessage: (msg: InboundTelegramMessage) => Promise<void>,
) {
this.bot = new Bot(token);
this.setupHandlers();
}
private setupHandlers(): void {
// /chatid command — lets users discover their chat ID
this.bot.command('chatid', async (ctx) => {
await ctx.reply(`Chat ID: ${ctx.chat.id}`);
});
this.bot.on('message:text', async (ctx) => {
const msg = ctx.message;
const from = ctx.from;
const chat = ctx.chat;
// Skip messages the bot sent itself
if (from?.is_bot) return;
const firstName = from?.first_name ?? '';
const lastName = from?.last_name ? ` ${from.last_name}` : '';
const senderName = firstName + lastName || null;
await this.onMessage({
chatId: chat.id,
senderId: from?.id ?? 0,
senderName,
content: msg.text,
isGroup:
chat.type === 'group' ||
chat.type === 'supergroup' ||
chat.type === 'channel',
sentAt: new Date(msg.date * 1000),
});
});
}
async start(): Promise<void> {
// start() launches long polling — non-blocking, runs in background
this.bot.start().catch((err) => {
console.error('[telegram] Bot error:', err);
});
console.log('[telegram] Bot started (long polling)');
}
async stop(): Promise<void> {
await this.bot.stop();
console.log('[telegram] Bot stopped');
}
async sendMessage(chatId: number, text: string): Promise<void> {
await this.bot.api.sendMessage(chatId, text);
}
async setTyping(chatId: number): Promise<void> {
await this.bot.api.sendChatAction(chatId, 'typing').catch(() => {});
}
}
6b. server/telegram/poller.ts
import type { ModelMessage, ToolModelMessage } from 'ai';
import { asc, desc, eq } from 'drizzle-orm';
import { runAgent } from '../agent';
import { db, telegramMessages, threadMessages, threads } from '../db';
const POLL_INTERVAL_MS = 2000;
/** tg:<chatId> — consistent JID format used in threads table */
function toJid(chatId: number | string): string {
return `tg:${chatId}`;
}
function getBotMentionPattern(): RegExp {
const botName = process.env.BOT_NAME || 'corsair';
return new RegExp(`@${botName}`, 'i');
}
function buildResumeMessages(
storedMessages: ModelMessage[],
toolCallId: string,
toolName: string,
answer: string,
): ModelMessage[] {
return [
...storedMessages,
{
role: 'tool',
content: [
{
type: 'tool-result',
toolCallId,
toolName,
output: { type: 'text', value: answer },
},
],
} satisfies ToolModelMessage,
];
}
async function getOrCreateThread(jid: string): Promise<string> {
const [existing] = await db
.select({ id: threads.id })
.from(threads)
.where(eq(threads.jid, jid))
.limit(1);
if (existing) return existing.id;
const [created] = await db
.insert(threads)
.values({ source: 'telegram', jid })
.returning({ id: threads.id });
return created!.id;
}
async function pollOnce(
sendMessage: (chatId: number, text: string) => Promise<void>,
setTyping: (chatId: number) => Promise<void>,
): Promise<void> {
const unprocessed = await db
.select()
.from(telegramMessages)
.where(eq(telegramMessages.processed, false))
.orderBy(asc(telegramMessages.createdAt));
for (const msg of unprocessed) {
// Groups: only trigger when @botname is mentioned
if (msg.isGroup) {
const mentionPattern = getBotMentionPattern();
if (!mentionPattern.test(msg.content)) {
await db
.update(telegramMessages)
.set({ processed: true })
.where(eq(telegramMessages.id, msg.id));
continue;
}
}
// Mark processed before calling the agent — prevents duplicate processing
await db
.update(telegramMessages)
.set({ processed: true })
.where(eq(telegramMessages.id, msg.id));
const jid = toJid(msg.chatId);
const chatIdNum = Number(msg.chatId);
const threadId = await getOrCreateThread(jid);
await db.insert(threadMessages).values({
threadId,
role: 'user',
text: msg.content,
});
const recent = await db
.select()
.from(threadMessages)
.where(eq(threadMessages.threadId, threadId))
.orderBy(desc(threadMessages.createdAt))
.limit(10);
const pendingAssistant = recent.find(
(m) => m.role === 'assistant' && m.pendingToolCallId,
);
let agentMessages: ModelMessage[];
if (
pendingAssistant?.pendingMessages &&
pendingAssistant.pendingToolCallId &&
pendingAssistant.pendingToolName
) {
agentMessages = buildResumeMessages(
pendingAssistant.pendingMessages as ModelMessage[],
pendingAssistant.pendingToolCallId,
pendingAssistant.pendingToolName,
msg.content,
);
await db
.update(threadMessages)
.set({
pendingMessages: null,
pendingToolCallId: null,
pendingToolName: null,
})
.where(eq(threadMessages.id, pendingAssistant.id));
} else {
const history = await db
.select()
.from(threadMessages)
.where(eq(threadMessages.threadId, threadId))
.orderBy(asc(threadMessages.createdAt))
.limit(5);
agentMessages = history.map((m) => ({
role: m.role as 'user' | 'assistant',
content: m.text || '',
}));
}
try {
await setTyping(chatIdNum);
const output = await runAgent(agentMessages, { jid });
let replyText = '';
if (output.type === 'needs_input') {
replyText = output.question;
const pendingMsgs: ModelMessage[] = [
...agentMessages,
...output.pendingMessages.slice(agentMessages.length),
];
await db.insert(threadMessages).values({
threadId,
role: 'assistant',
text: replyText,
pendingMessages: pendingMsgs,
pendingToolCallId: output.toolCallId,
pendingToolName: output.toolName,
});
} else if (output.type === 'message') {
replyText = output.text;
await db.insert(threadMessages).values({
threadId,
role: 'assistant',
text: replyText,
});
} else if (output.type === 'script') {
replyText = output.error
? `Error: ${output.error}`
: output.message || output.output?.trim() || 'Done.';
await db.insert(threadMessages).values({
threadId,
role: 'assistant',
text: replyText,
});
} else if (output.type === 'workflow') {
replyText = output.message
? output.message
: output.cronSchedule
? `Workflow scheduled: ${output.cronSchedule}`
: output.webhookTrigger
? `Webhook workflow registered for ${output.webhookTrigger.plugin}.${output.webhookTrigger.action}`
: 'Workflow stored.';
await db.insert(threadMessages).values({
threadId,
role: 'assistant',
text: replyText,
});
}
if (replyText) {
await sendMessage(chatIdNum, replyText);
}
await db
.update(threads)
.set({ updatedAt: new Date() })
.where(eq(threads.id, threadId));
} catch (err) {
console.error('[telegram] Agent error for message', msg.id, ':', err);
await sendMessage(
chatIdNum,
'Sorry, something went wrong. Please try again.',
).catch(() => {});
}
}
}
export function startPoller(
sendMessage: (chatId: number, text: string) => Promise<void>,
setTyping: (chatId: number) => Promise<void>,
): () => void {
let running = true;
async function loop(): Promise<void> {
while (running) {
try {
await pollOnce(sendMessage, setTyping);
} catch (err) {
console.error('[telegram] Poller error:', err);
}
await new Promise<void>((resolve) =>
setTimeout(resolve, POLL_INTERVAL_MS),
);
}
}
loop().catch(console.error);
console.log('[telegram] Poller started (2s interval)');
return () => {
running = false;
};
}
6c. server/telegram/index.ts
import { db, telegramChats, telegramMessages } from '../db';
import type { InboundTelegramMessage } from './connection';
import { TelegramConnection } from './connection';
import { startPoller } from './poller';
/**
* Start the Telegram integration:
* 1. Connect bot using TELEGRAM_BOT_TOKEN
* 2. Store every inbound message to Postgres
* 3. Start the 2-second poller that triggers the corsair agent
*
* Returns an async shutdown function.
*/
export async function startTelegram(): Promise<() => Promise<void>> {
const token = process.env.TELEGRAM_BOT_TOKEN;
if (!token) {
console.error(
'[telegram] TELEGRAM_BOT_TOKEN is not set. Add it to .env and restart.',
);
return async () => {};
}
const connection = new TelegramConnection(token, handleInbound);
async function handleInbound(msg: InboundTelegramMessage): Promise<void> {
const chatId = String(msg.chatId);
// Upsert chat record
await db
.insert(telegramChats)
.values({
chatId,
name: msg.senderName,
type: msg.isGroup ? 'group' : 'dm',
})
.onConflictDoNothing();
// Store for poller
await db.insert(telegramMessages).values({
chatId,
senderId: String(msg.senderId),
senderName: msg.senderName,
content: msg.content,
sentAt: msg.sentAt,
isGroup: msg.isGroup,
processed: false,
});
console.log(
`[telegram] Stored message from ${msg.senderName ?? msg.senderId} in chat ${chatId}`,
);
}
await connection.start();
const stopPoller = startPoller(
(chatId, text) => connection.sendMessage(chatId, text),
(chatId) => connection.setTyping(chatId),
);
return async () => {
console.log('[telegram] Shutting down...');
stopPoller();
await connection.stop();
};
}
7. Update server/db/index.ts (re-export new tables)
Open server/db/index.ts. Add telegramMessages and telegramChats to the exports from ./schema. If the file uses export * from './schema', no change is needed — they'll be exported automatically. If it has named exports, add the two new table names.
8. Wire Telegram into server/index.ts
Open server/index.ts and make three changes:
8a. Add import at the top
import { startTelegram } from './telegram/index';
Also add telegramMessages to the destructured import from './db':
import {
db,
permissions,
telegramMessages,
threadMessages,
threads,
whatsappMessages,
workflows,
} from './db';
8b. Handle permission resume for Telegram threads
Find the block starting at if (thread?.source === 'whatsapp' && thread.jid) in the permission resolve handler. Extend it to also handle Telegram:
if (
(thread?.source === 'whatsapp' || thread?.source === 'telegram') &&
thread.jid
) {
if (thread.source === 'whatsapp') {
// Insert a synthetic WhatsApp message so the poller picks it up
await db.insert(whatsappMessages).values({
jid: thread.jid,
senderJid: 'system',
senderName: 'Permission System',
content: answer,
sentAt: new Date(),
isGroup: false,
isBot: false,
processed: false,
});
} else {
// Insert a synthetic Telegram message so the poller picks it up
const chatId = thread.jid.replace(/^tg:/, '');
await db.insert(telegramMessages).values({
chatId,
senderId: 'system',
senderName: 'Permission System',
content: answer,
sentAt: new Date(),
isGroup: false,
processed: false,
});
}
} else {
// Web thread: resume agent and save result to the thread
// ... existing web resume code ...
}
8c. Start Telegram after the server listens
Find the WhatsApp startup block at the bottom of main():
if (process.env.WHATSAPP_ENABLED === 'true') {
console.log('[server] Starting WhatsApp listener...');
startWhatsApp().catch((err) => {
console.error('[server] WhatsApp startup failed:', err);
});
}
Add immediately after it:
if (process.env.TELEGRAM_ENABLED === 'true') {
console.log('[server] Starting Telegram listener...');
startTelegram().catch((err) => {
console.error('[server] Telegram startup failed:', err);
});
}
9. Push database migrations
The new tables need to exist in Postgres. Migrations run automatically on container restart, but trigger one now:
docker compose up -d agent
Follow the logs to confirm the migration ran:
docker compose logs agent | grep -E 'db:push|telegram|error' | head -20
10. Restart and verify
docker compose up -d agent
docker compose logs -f agent
Check for:
[telegram] Bot started (long polling)— bot connected[telegram] Poller started (2s interval)— ready to receive messages
If you see TELEGRAM_BOT_TOKEN is not set, the env var didn't make it into the container. Run:
docker compose up -d agent # (not restart — restart preserves old env)
11. Test the connection
Tell the user:
- Open Telegram and search for your bot's username (e.g.
@my_assistant_bot)- Tap Start or send any message
- The agent should reply within a few seconds
For groups: add the bot to a group, then send
@corsair <your message>(using yourBOT_NAME).To find your chat ID, send
/chatidto the bot.
12. Groups: disable privacy mode (optional)
If the user wants the bot to see all group messages without being @mentioned, tell them:
By default Telegram bots only see @mentions and commands in groups. To let the bot see all messages:
- Open @BotFather →
/mybots→ select your bot- Bot Settings → Group Privacy → Turn off
- Remove and re-add the bot to any existing groups (required for the change to take effect)
How it works
Telegram message received
→ grammy long-polling receives it
→ Stored in postgres (telegram_messages, processed=false)
→ Poller queries every 2s
→ DMs: always trigger agent
→ Groups: only if @corsair (BOT_NAME) is in the message
→ runAgent() called → uses Corsair plugins to complete the task
→ Response sent back via Telegram
Troubleshooting
Bot not responding to messages:
- Check
TELEGRAM_ENABLED=truein.env - Check
TELEGRAM_BOT_TOKENis set and correct - Ensure
WHATSAPP_ENABLED=falsein.env - Check
[telegram] Poller startedin logs:docker compose logs agent | grep telegram - For groups: message must include
@corsair(orBOT_NAME)
TELEGRAM_BOT_TOKEN is not set error:
- Run
docker compose up -d agent(notdocker compose restart) to pick up new.envvalues
grammy not found / module resolution error:
- grammy is pre-installed in the image. If you're seeing this, the named volume may contain a stale
node_modulesfrom before grammy was added. Clear it and rebuild:
(Adjust the volume name prefix if your project directory has a different name — check withdocker compose down && docker volume rm corsair-2_agent_node_modules && docker compose up --build -ddocker volume ls.)
ERR_PNPM_UNEXPECTED_STORE or similar pnpm errors:
- Never run
pnpm addinside a running container — the pnpm store paths conflict. Always editpackage.jsonand do the volume-rm + rebuild sequence above.
Verify bot token:
curl "https://api.telegram.org/bot<YOUR_TOKEN>/getMe"
Re-authenticate (new bot): Just update TELEGRAM_BOT_TOKEN in .env and restart.