name: thalamus description: "Build with @novu/thalamus, a provider-agnostic runtime for managed AI agents. Use when code imports @novu/thalamus, creates AI providers (OpenAI, Anthropic), streams agent responses, manages sessions, configures MCP servers, handles vaults/credentials, implements approval flows, or sets up durable sessions with webhooks."
@novu/thalamus
Provider-agnostic runtime for managed AI agents. One Provider interface, same messages in, same streaming events out — regardless of which provider runs the agent.
Install
npm install @novu/thalamus
# Then install the peer dependency for your provider:
npm install openai # for OpenAI
npm install @anthropic-ai/sdk # for Anthropic
Imports
// Main entry — includes both providers + all types
import { createOpenAIProvider, createAnthropicProvider, MessageRole } from '@novu/thalamus';
// Convenience namespace
import { thalamus } from '@novu/thalamus';
const provider = thalamus.openai({ ... });
const provider = thalamus.anthropic({ ... });
// Tree-shakeable subpath imports
import { createOpenAIProvider } from '@novu/thalamus/openai';
import { createAnthropicProvider } from '@novu/thalamus/anthropic';
import { createMemoryVaultStore } from '@novu/thalamus/vault';
// Durable backends
import { redis, cloudflare } from '@novu/thalamus/durable';
// Webhook handler (for edge observer mode)
import { createWebhookHandler } from '@novu/thalamus/webhook';
// Lifecycle logging (optional, default silent)
import {
adaptPinoLogger,
createConsoleLogger,
resolveLogger,
silentLogger,
} from '@novu/thalamus';
// Errors
import { ThalamusError, ProviderRateLimitError, ProviderAuthError, AbortedError } from '@novu/thalamus';
Creating Providers
OpenAI (direct)
const provider = createOpenAIProvider({
apiKey: process.env.OPENAI_API_KEY,
model: 'gpt-4o', // optional, defaults to 'gpt-4o'
instructions: '...', // optional system instructions
promptId: '...', // optional OpenAI prompt template ID, sets runtimeId
mcpServers: [...], // optional MCP server configs
vaultStore: createMemoryVaultStore(), // optional, required for vault ops
onSessionEvents: ({ sessionId, turnId, runId }) => ({ ... }), // optional streaming callbacks
durable: redis(redisClient), // optional durability backend
logger: 'debug', // optional — false | 'silent' | 'debug' | custom adapter
});
OpenAI via AWS Bedrock (API key)
const provider = createOpenAIProvider({
awsRegion: 'us-east-1',
awsBedrockApiKey: process.env.AWS_BEDROCK_API_KEY,
model: 'anthropic.claude-sonnet-4-20250514',
});
OpenAI via AWS Bedrock (SigV4)
Requires additional peer deps: @smithy/signature-v4 and @aws-crypto/sha256-js.
const provider = createOpenAIProvider({
awsRegion: 'us-east-1',
awsCredentials: {
accessKeyId: '...',
secretAccessKey: '...',
sessionToken: '...', // optional
},
model: 'anthropic.claude-sonnet-4-20250514',
});
Anthropic Managed Agents (direct)
const provider = createAnthropicProvider({
apiKey: process.env.ANTHROPIC_API_KEY,
agentId: 'agent_01J...',
environmentId: 'env_01J...',
onSessionEvents: ({ sessionId, turnId, runId }) => ({ ... }), // optional
durable: redis(redisClient), // optional
logger: adaptPinoLogger(pino), // optional
});
Anthropic via AWS
Requires peer dependency @anthropic-ai/aws-sdk (npm install @anthropic-ai/aws-sdk).
const provider = createAnthropicProvider({
awsRegion: 'us-east-1',
awsWorkspaceId: 'wrkspc_...', // optional
apiKey: process.env.ANTHROPIC_AWS_API_KEY,
agentId: 'agent_01J...',
environmentId: 'env_01J...',
durable: cloudflare({ ... }), // optional — API key auth works with EdgeObserver
});
The Provider Interface
Every provider implements this. All downstream code is provider-agnostic.
interface Provider {
readonly provider: string; // 'openai' | 'anthropic'
readonly runtimeId: string;
send(params: RequestParams): SendResult; // or Promise<WebhookSendResult> in webhook mode
createVault(options: VaultOptions): Promise<Vault>;
getVault(vaultId: string): Promise<Vault>;
createSession(options?: SessionOptions): Promise<string>;
endSession(sessionId: string): Promise<void>;
}
interface WebhookSendResult {
sessionId: string;
runId: string;
turnId: string;
}
When durable is configured with an edge observer (Cloudflare + webhook), TypeScript narrows send() to return Promise<WebhookSendResult> instead of SendResult. The narrowed WebhookProvider also exposes createWebhookHandler({ secret }), which inherits logger and onSessionEvents from provider config.
Messages
enum MessageRole { USER = 'user', ASSISTANT = 'assistant', SYSTEM = 'system' }
interface Message {
role: MessageRole;
content: string | ContentPart[];
}
// ContentPart variants:
// { type: 'text', text: string }
// { type: 'image', data: string, mediaType: string }
// { type: 'image-url', url: string }
// { type: 'file', data: string, mediaType: string, name?: string }
Provider differences (Anthropic)
[!IMPORTANT] Anthropic Managed Agent sessions do not replay assistant/system rows as native input — history is server-side. Thalamus accepts the same
messages[]as OpenAI so your code does not branch on provider, but on Anthropic priorassistant/systemrows are folded into a[Context]text block on the next user message. Treat this as a recovery workaround (dead/expired session), not normal operation.
Prefer: live sessionId + one new USER message.
Recovery only: pass saved transcript as ASSISTANT + USER rows when the session is gone. OpenAI uses native roles; Anthropic gets the packed format.
Use ASSISTANT for prior agent chat lines, SYSTEM only for extra per-send instructions (agent persona belongs on provider/agent config).
send() and SendResult
send() returns a SendResult — a PromiseLike<Response> that you can consume in several ways:
// Await for final response
const response = await provider.send({ messages });
// Just the text
const text = await provider.send({ messages }).text();
// Get the sessionId early (resolves before the stream finishes)
const result = provider.send({ messages });
const sessionId = await result.sessionId;
// runId is known synchronously — unique per send() invocation
const result = provider.send({ messages });
console.log(result.runId); // e.g. 'a1b2c3d4-...'
// turnId groups sends within one logical user interaction (e.g. message + tool approvals)
console.log(result.turnId); // e.g. 'e5f6g7h8-...'
// Carry it forward on approval resumes:
await provider.send({ sessionId, toolResults, turnId: result.turnId, messages: [] });
IDs
| ID | Scope | Lifecycle | Purpose |
|---|---|---|---|
sessionId |
Conversation | Many turns | Provider session identity |
turnId |
User interaction | One user message -> final answer (may span multiple send() calls for tool approvals) |
Business grouping |
runId |
Webhook delivery | One send() call |
Transport dedup/observability |
runId— fresh UUID persend(). Available synchronously onSendResultand inWebhookSendResult. Passed in theonSessionEventscontext object. Echoed in every webhook payload.turnId— stable across approval resumes. Generated automatically on fresh sends, carried forward when you passturnIdfrom a previousSendResultinRequestParams. Groups a logical user interaction (initial message + subsequent tool approvals) under one ID.
RequestParams
interface RequestParams {
messages: Message[];
sessionId?: string; // continue a conversation
vaultIds?: string[]; // bind vault credentials
toolResults?: ToolResult[]; // approval responses or tool outputs
providerOptions?: Record<string, unknown>; // pass-through to provider SDK
abortSignal?: AbortSignal; // cancel the request
webhookMetadata?: Record<string, string>; // forwarded in webhook payloads
turnId?: string; // carry forward from previous SendResult for turn grouping
agent?: AgentSessionConfig; // per-turn MCP/tool overrides (see below)
}
AgentSessionConfig fields:
interface AgentSessionConfig {
mcpServers?: McpServerConfig[]; // override which MCP servers are active this turn
tools?: AgentToolConfig[]; // custom tools (mapped to provider format)
providerTools?: Record<string, unknown>[]; // provider-specific tool payloads (passthrough)
}
interface AgentToolConfig {
name: string;
description: string;
inputSchema: Record<string, unknown>; // JSON Schema object
}
Sequential turns
Messages to the same session are serialized automatically. If you send() while a previous turn is still running, it queues and dispatches when the session is ready. toolResults bypass the queue (they resolve the current requires-action turn). Different sessions don't block each other.
A status-change: queued event fires when a message is waiting. In webhook mode, the edge observer manages the queue and sends queue-ready webhooks when slots open.
providerOptions Pass-Through
Forward arbitrary options to the underlying SDK call:
await provider.send({
messages,
providerOptions: {
temperature: 0.5,
max_output_tokens: 4096,
reasoning: { effort: 'high' },
},
});
Streaming with Callbacks
Callbacks are attached at provider creation time via onSessionEvents, not on individual send() calls. This is intentional — the same factory is used to re-attach callbacks when recovering durable sessions after a restart.
const provider = createOpenAIProvider({
apiKey: process.env.OPENAI_API_KEY,
model: 'gpt-4o',
onSessionEvents: ({ sessionId, turnId, runId }) => ({
onTextDelta: ({ text }) => pushToClient(sessionId, text),
onToolUseDone: ({ toolName }) => console.log(`[${runId}] used ${toolName}`),
onFinish: ({ response }) => saveResponse(sessionId, response),
}),
});
// When onSessionEvents is set, send() starts consuming immediately —
// callbacks fire even if you never await the result.
provider.send({ messages });
The factory receives a SessionEventContext object containing:
sessionId— route events to the right client connectionturnId— group events from the same logical user interaction (stable across approval resumes)runId— correlate callbacks to a specificsend()invocation (unique per call)metadata— webhook metadata from the originatingsend()(empty object in streaming mode)
Async callbacks and sequential dispatch
Callbacks may return void or Promise<void>. When a callback returns a promise, the SDK awaits it before dispatching the next event:
onSessionEvents: ({ sessionId, runId }) => ({
// Sync — instant, no backpressure (ideal for streaming text to a socket)
onTextDelta: ({ text }) => socket.emit('delta', text),
// Async — awaited before the next event dispatches (ideal for DB writes)
onToolUseDone: async ({ toolName, input }) => {
await db.activities.insertOne({ sessionId, runId, toolName, input });
},
}),
This applies to both streaming mode and webhook mode — same callbacks, same ordering guarantee. Sync callbacks add zero overhead (await undefined = one microtick). To fire-and-forget inside a callback, use void asyncFn() without returning the promise.
StreamPart types
| Type | Key fields |
|---|---|
message |
text (one complete assistant message — the atomic text unit across all providers) |
text-delta |
text (incremental token chunk; only emitted by providers that stream, e.g. OpenAI) |
thinking |
text |
refusal |
text |
tool-use-start |
toolName, toolUseId, source? |
tool-use-delta |
toolUseId, argumentsDelta |
tool-use-done |
toolName, toolUseId, input?, source? |
tool-use-result |
toolUseId, content (ToolResultContent[]), isError?, source? |
mcp-tools-discovered |
serverName, tools: McpToolDef[] |
status-change |
status: 'running' | 'queued' | 'retrying' | 'idle' |
stream-start |
sessionId? |
finish |
response: Response |
error |
error: Error |
provider-event |
provider, event, data (escape hatch) |
messagevstext-delta:messagefires once per complete assistant message and is emitted by all providers — useonMessagefor provider-agnostic code.text-deltais a streaming-only enhancement for live typing and is only emitted by providers that stream tokens (OpenAI); Anthropic does not emittext-delta.Response.messagescollects everymessageof the turn.
StreamCallbacks
One callback per StreamPart type, plus onPart which fires for every part before type-specific callbacks:
interface StreamCallbacks {
onPart?: (part: StreamPart) => void;
onMessage?: ...; // one complete assistant message (all providers)
onTextDelta?: ...; // incremental tokens (OpenAI only)
onThinking?: ...;
onRefusal?: ...;
onToolUseStart?: ...;
onToolUseDelta?: ...;
onToolUseDone?: ...;
onToolUseResult?: ...;
onMcpToolsDiscovered?: ...;
onStatusChange?: ...;
onStreamStart?: ...;
onFinish?: ...;
onError?: ...;
onProviderEvent?: ...;
}
Response
interface Response {
messages: string[]; // every assistant message produced during the turn, in order
sessionId?: string;
finishReason: 'stop' | 'length' | 'error' | 'requires-action' | 'refused' | 'other';
usage?: { inputTokens?: number; outputTokens?: number; totalTokens?: number };
actionsRequired?: ActionRequired[];
}
SendResult.text() joins messages with \n\n.
Sessions
Pass sessionId from a previous response to continue a conversation:
const first = await provider.send({
messages: [{ role: MessageRole.USER, content: 'Remember: my name is Alice' }],
});
const second = await provider.send({
sessionId: first.sessionId,
messages: [{ role: MessageRole.USER, content: 'What is my name?' }],
});
With a live sessionId, send only the new user turn. After session loss, pass transcript in messages only as a recovery fallback — see Provider differences above.
Per-turn agent overrides
Pass agent on send() to override MCP servers and tools for a single turn — no session state or extra API methods required.
// Provider defaults: github, slack, linear
const provider = createOpenAIProvider({
apiKey: process.env.OPENAI_API_KEY,
model: 'gpt-4o',
mcpServers: [
{ name: 'github', url: 'https://api.githubcopilot.com/mcp/' },
{ name: 'slack', url: 'https://mcp.slack.com/sse' },
{ name: 'linear', url: 'https://mcp.linear.app/sse' },
],
});
// This subscriber only gets GitHub MCP tools for this turn
await provider.send({
sessionId,
messages: [{ role: MessageRole.USER, content: 'Summarize my open PRs' }],
agent: {
mcpServers: [
{ name: 'github', url: 'https://api.githubcopilot.com/mcp/' },
],
},
});
// Custom tools + provider-specific entries
await provider.send({
sessionId,
messages: [{ role: MessageRole.USER, content: 'Run lookup' }],
agent: {
tools: [
{
name: 'lookup_subscriber',
description: 'Look up a subscriber by ID',
inputSchema: { type: 'object', properties: { id: { type: 'string' } } },
},
],
providerTools: [
{ type: 'agent_toolset_20260401' }, // Anthropic passthrough
],
mcpServers: [{ name: 'github', url: 'https://api.githubcopilot.com/mcp/' }],
},
});
Provider behavior:
- OpenAI — resolves
agent.mcpServers, mapsagent.toolsto function tools, and passesagent.providerToolsthrough when building theresponses.create()request. - Anthropic — calls
sessions.update()before dispatch to apply MCP/tool overrides for that turn. Existing non-MCP tools are preserved when onlymcpServersis set;tools+providerToolsreplace non-MCP tools when provided.
In webhook/edge mode, agent is serialized in SerializedRequestParams so queued turns keep the same overrides.
AbortSignal
Pass an AbortSignal to cancel long-running agent turns:
const controller = new AbortController();
const result = provider.send({
messages,
abortSignal: controller.signal,
});
controller.abort();
try {
await result;
} catch (err) {
if (err instanceof AbortedError) {
// err.sessionId is available when known
}
}
MCP Servers
OpenAI — configure at provider creation
const provider = createOpenAIProvider({
apiKey: '...',
model: 'gpt-4o',
mcpServers: [{
name: 'github',
url: 'https://api.githubcopilot.com/mcp/',
authorization: 'Bearer ghp_xxx', // optional static auth
allowedTools: ['search_repos'], // optional tool filter
approvalPolicy: 'never', // 'always' | 'never' | { except: string[] }
}],
});
Anthropic — MCP servers are configured in the Anthropic console at the environment level.
Detecting tool source
onSessionEvents: ({ sessionId }) => ({
onToolUseDone: (part) => {
if (part.source?.type === 'mcp') {
console.log(`MCP tool from ${part.source.serverName}`);
} else if (part.source?.type === 'custom') {
console.log('Custom tool — you handle execution');
}
},
onMcpToolsDiscovered: ({ serverName, tools }) => {
console.log(`${serverName} offers: ${tools.map(t => t.name)}`);
},
}),
ToolSource is { type: 'builtin' }, { type: 'custom' }, or { type: 'mcp', serverName }.
Vault & Credentials
Vaults manage credentials for MCP server authentication.
- Anthropic — proxies to Anthropic's native Vault API (server-side storage).
- OpenAI — stored in a
VaultStoreyou provide, injected into MCP requests at call time.
// OpenAI setup — pass a VaultStore
import { createMemoryVaultStore } from '@novu/thalamus';
const provider = createOpenAIProvider({
apiKey: '...',
mcpServers: [{ name: 'github', url: '...' }],
vaultStore: createMemoryVaultStore(), // in-memory; implement VaultStore for production
});
// Create vault, add credential, use it
const vault = await provider.createVault({ name: 'Alice' });
await vault.add('github', { type: 'bearer', token: 'ghp_xxx' });
const response = await provider.send({
messages: [...],
vaultIds: [vault.id],
});
// Credential types
type Credential =
| { type: 'bearer'; token: string }
| { type: 'oauth'; accessToken: string; expiresAt?: string; refresh?: OAuthRefreshConfig };
// Vault API
vault.add(name, credential) // add credential
vault.update(name, credential) // replace credential
vault.remove(name) // remove credential
vault.list() // list credentials (CredentialInfo[], no secrets)
vault.destroy() // delete vault
Custom VaultStore (production)
Implement the VaultStore interface to persist credentials in your database:
interface VaultStore {
createVault(options: VaultOptions): Promise<VaultRecord>;
getVault(vaultId: string): Promise<VaultRecord | null>;
updateVaultMetadata(vaultId: string, metadata: Record<string, string>): Promise<void>;
removeVault(vaultId: string): Promise<void>;
set(vaultId: string, name: string, credential: Credential): Promise<void>;
get(vaultId: string, name: string): Promise<StoredCredential | null>;
getAll(vaultId: string): Promise<StoredCredential[]>;
remove(vaultId: string, name: string): Promise<void>;
}
Approval Flow (Human-in-the-Loop)
When a tool requires approval, the response finishes with finishReason: 'requires-action':
const result = provider.send({ messages });
const response = await result;
if (response.finishReason === 'requires-action') {
// ActionRequired = { type: 'tool-confirmation' | 'mcp-approval', toolUseId, toolName, input? }
for (const action of response.actionsRequired!) {
console.log(`${action.toolName} wants to run with`, action.input);
}
// Resume with approval — carry turnId to group under the same logical turn
const resumed = await provider.send({
messages: [],
sessionId: response.sessionId,
turnId: result.turnId,
toolResults: response.actionsRequired!.map(a => ({
toolUseId: a.toolUseId,
approved: true, // or false to deny
})),
});
}
Sending custom tool results
const response = await provider.send({
messages: [],
sessionId: previousResponse.sessionId,
toolResults: [{
toolUseId: 'call_xyz',
output: JSON.stringify({ result: 42 }),
isError: false,
}],
});
Durable Sessions
Agent sessions can run for minutes. The durable option on provider config enables persistence and recovery across connection drops and process restarts.
Two kinds of backends:
Checkpoint backends (Redis)
The SDK holds the SSE connection in your Node.js process. Events are checkpointed to Redis. On connection drops, it reconnects from the last checkpoint with up to 3 retries. On process restart, active sessions resume automatically.
Requires onSessionEvents to be set so callbacks can be re-attached on recovery.
import { redis } from '@novu/thalamus/durable';
import { createOpenAIProvider } from '@novu/thalamus';
import Redis from 'ioredis';
const provider = createOpenAIProvider({
apiKey: process.env.OPENAI_API_KEY,
model: 'gpt-4o',
durable: redis(new Redis()),
onSessionEvents: ({ sessionId, runId }) => ({
onTextDelta: ({ text }) => pushToClient(sessionId, text),
onFinish: ({ response }) => saveResponse(sessionId, response),
}),
});
// Custom hash key
const backend = redis(redisClient, { key: 'myapp:sessions' });
Edge observer (Cloudflare)
Instead of holding the SSE connection in your Node.js process, offload it to a Cloudflare Worker. Your app just needs a webhook endpoint — no SSE connections, no reconnection logic.
import { cloudflare } from '@novu/thalamus/durable';
import { createWebhookHandler } from '@novu/thalamus/webhook';
// Provider setup — send() returns sessionId (not Response) in webhook mode
const provider = createAnthropicProvider({
apiKey: process.env.ANTHROPIC_API_KEY,
agentId: 'agent_01J...',
environmentId: 'env_01J...',
durable: cloudflare({
url: 'https://session-observer.your-domain.workers.dev',
apiKey: process.env.OBSERVER_API_KEY,
webhook: {
url: 'https://your-app.com/webhook',
secret: process.env.WEBHOOK_SECRET,
},
}),
});
// In webhook mode, send() returns Promise<WebhookSendResult>
const { sessionId, runId, turnId } = await provider.send({
messages: [{ role: MessageRole.USER, content: 'Hello' }],
webhookMetadata: { userId: 'u_123' }, // forwarded in webhook payloads
});
Webhook handler
Receives events from the Cloudflare edge observer. HMAC-verified.
const handler = createWebhookHandler({
secret: process.env.WEBHOOK_SECRET,
logger: adaptPinoLogger(pino), // optional — same adapter as provider
onSessionEvents: ({ sessionId, turnId, runId, metadata }) => ({
onPart(part) {
switch (part.type) {
case 'message': // complete assistant message — all providers
pushToClient(sessionId, part.text);
break;
case 'text-delta': // incremental tokens — OpenAI only
pushDeltaToClient(sessionId, part.text);
break;
case 'finish':
saveResponse(sessionId, part.response);
break;
}
},
}),
});
// Or bind to the provider (inherits logger + onSessionEvents)
const handler = provider.createWebhookHandler({
secret: process.env.WEBHOOK_SECRET,
});
// Express / Node http
app.post('/webhook', (req, res) => handler.express(req, res));
// Web standard Request/Response (Cloudflare Workers, Bun, Deno, Next.js)
export default { fetch: (req) => handler.handle(req) };
Note: the webhook onSessionEvents factory receives a SessionEventContext object with sessionId, turnId, runId, and metadata. The metadata contains the webhookMetadata you passed in send(). Session and run IDs are also exposed in the request headers: X-Thalamus-Session-Id, X-Thalamus-Run-Id.
Use standalone createWebhookHandler when one webhook endpoint serves many providers (e.g. Novu). Use provider.createWebhookHandler for single-provider apps.
Sequential turns in webhook mode
The edge observer queues messages per session. When you send() while a turn is active, the observer stores the request in SQLite and returns { status: "queued" }. When the active turn completes, it sends a queue-ready webhook. The WebhookHandler intercepts it and calls provider.dispatchQueued() to dispatch via SDK.
This is automatic with provider.createWebhookHandler(). For multi-provider setups using standalone createWebhookHandler(), pass onQueueReady:
const handler = createWebhookHandler({
secret,
onSessionEvents: (ctx) => handlers(ctx),
onQueueReady: async ({ sessionId, runId, turnId, request }) => {
const provider = await resolveProvider(request.webhookMetadata);
await provider.dispatchQueued(sessionId, runId, turnId, request);
},
});
Key types for the edge observer interface:
interface EdgeObserver {
enqueue(params: EdgeEnqueueParams): Promise<{ status: "active" | "queued" }>;
observe(params: EdgeObserveParams): Promise<void>;
stop(sessionId: string): Promise<void>;
}
interface WebhookProvider {
// ...existing methods...
dispatchQueued(sessionId: string, runId: string, turnId: string,
request: SerializedRequestParams): Promise<void>;
}
Custom durability backend
Implement DurabilityBackend for checkpoint-based durability with any storage:
interface DurabilityBackend {
save(checkpoint: SessionCheckpoint): Promise<void>;
remove(sessionId: string): Promise<void>;
getActive(): Promise<SessionCheckpoint[]>;
}
Lifecycle Logging
Opt-in structured logging for the durable pipeline (observe → dispatch → webhook → callbacks). Default is silent — no behavior change without opt-in.
SDK logs plumbing only (observe registration, dispatch, webhook ingress). Business events stay in onSessionEvents / onPart.
Provider config
import { adaptPinoLogger } from '@novu/thalamus';
const log = adaptPinoLogger(pino); // flips (msg, ctx) → Pino's (ctx, msg)
const provider = thalamus.anthropic({
agentId,
environmentId,
apiKey,
durable: cloudflare({ url, webhook }),
logger: log, // false | 'silent' | 'debug' | custom adapter
onSessionEvents: (ctx) => ({ ... }),
});
logger value |
Behavior |
|---|---|
omitted / false / "silent" |
No SDK logs |
"debug" |
Built-in console adapter ([thalamus] stage { ctx }) |
| custom / partial | Used as-is; missing methods no-op |
Helpers exported from @novu/thalamus: resolveLogger, silentLogger, createConsoleLogger, adaptPinoLogger, logErrorMessage.
Webhook handler
Pass the same adapter on standalone handlers (required when webhook is a singleton separate from provider creation):
const handler = createWebhookHandler({
secret,
logger: log,
onSessionEvents: (ctx) => ({ ... }),
});
Or use provider.createWebhookHandler({ secret }) to inherit provider logger and onSessionEvents.
Log context
Every log includes a stage key plus correlation fields when available:
type LogContext = {
stage: string;
provider?: 'anthropic' | 'openai';
sessionId?: string;
runId?: string;
turnId?: string;
sequence?: number;
eventType?: string;
mode?: 'webhook' | 'stream';
durationMs?: number;
error?: string;
};
Stage vocabulary
| Stage | Level | When |
|---|---|---|
send.start / send.complete |
info | Webhook send lifecycle |
edge.observe.start / edge.observe.ok / edge.observe.failed |
info / error | Edge observer registration |
edge.dispatch.sent |
info | Anthropic events dispatched after observe |
dispatch.start / dispatch.sent / dispatch.input |
debug / info | OpenAI background dispatch |
dispatch.events |
debug | Anthropic session events payload |
session.create |
info | New Anthropic session |
conversation.create |
debug | OpenAI conversation created |
stream.reconnect |
warn | Checkpoint SSE reconnect |
stream.error |
error | OpenAI stream-mode failure |
recovery.failed / recovery.stream.failed |
error | Durable session recovery |
webhook.received / webhook.handled |
debug | Webhook ingress + callback success |
webhook.missing-signature / webhook.invalid-signature / webhook.invalid-payload |
warn | Webhook auth/validation |
webhook.callback.failed |
error | onSessionEvents callback threw |
cloudflare() has no logger — provider logs the observe lifecycle.
Cloudflare Worker Deployment
The companion edge observer worker lives in cloudflare-worker/. It uses the Cloudflare Agents SDK with a Durable Object per session that:
- Opens the SSE connection to the AI provider
- Persists every event to SQLite
- Delivers events to your webhook with at-least-once delivery
- Retries with exponential backoff, hibernates between retries
Deploy
cd cloudflare-worker
pnpm install
npx wrangler secret put API_KEY # set the auth key
npx wrangler deploy
Local development
cd cloudflare-worker
pnpm install
npx wrangler dev
The worker depends on @novu/thalamus as a workspace package (linked via pnpm-workspace.yaml). Changes to the main package are immediately reflected without publishing.
API
| Method | Path | Description |
|---|---|---|
POST |
/observe |
Start observing a session |
DELETE |
/observe/:sessionId |
Stop observation (pending events still deliver) |
GET |
/health |
Health check |
These endpoints are called automatically by the SDK when using cloudflare() as a durable backend — you don't call them directly.
Error Handling
All errors extend ThalamusError with provider and isRetryable fields.
| Error class | Retryable | When |
|---|---|---|
ProviderAuthError |
No | Invalid API key / unauthorized |
ProviderRateLimitError |
Yes | Rate limited (has retryAfterMs?) |
ProviderUnavailableError |
Yes | Provider temporarily down |
ProviderResponseError |
No | Invalid response from provider |
SessionExpiredError |
Yes | Session expired (has sessionId) |
AbortedError |
No | Request cancelled via AbortSignal (has sessionId?) |
VaultNotFoundError |
No | Vault doesn't exist (has vaultId) |
CredentialExpiredError |
No | Credential expired, no refresh config |
McpServerError |
5xx only | MCP server error (has serverName, statusCode?) |
import { ThalamusError, ProviderRateLimitError, AbortedError } from '@novu/thalamus';
try {
await provider.send({ messages });
} catch (err) {
if (err instanceof AbortedError) {
console.log('Cancelled, session:', err.sessionId);
} else if (err instanceof ProviderRateLimitError) {
await sleep(err.retryAfterMs ?? 5000);
} else if (err instanceof ThalamusError && err.isRetryable) {
// generic retry
}
}
Package Subpaths
| Subpath | Contents |
|---|---|
@novu/thalamus |
Core types, errors, thalamus factory, createMemoryVaultStore, logger helpers, both providers |
@novu/thalamus/anthropic |
createAnthropicProvider |
@novu/thalamus/openai |
createOpenAIProvider |
@novu/thalamus/vault |
Vault types and VaultStore interface |
@novu/thalamus/durable |
redis(), cloudflare(), DurabilityBackend, EdgeObserver |
@novu/thalamus/webhook |
createWebhookHandler, createProviderWebhookHandler — HMAC-verified webhook receiver |
Key Design Notes
SendResultisPromiseLike<Response>— you canawaitit directly or call.text()/.sessionId/.runId/.turnId.runIdis generated synchronously persend()call (UUID). Available onSendResult.runId, inWebhookSendResult, passed in theonSessionEventscontext, and echoed in every webhook payload + header (X-Thalamus-Run-Id).turnIdgroups multiplesend()calls within one logical user interaction (e.g. initial message + tool approval resumes). Generated automatically on fresh sends; carried forward when you passturnIdfrom a previous result inRequestParams.onSessionEventsfactory receives aSessionEventContextobject ({ sessionId, turnId, runId, metadata }) — extensible without breaking changes.- Callbacks are set at provider creation via
onSessionEvents, not persend()call. This ensures consistent callback re-attachment for durable session recovery. - When
onSessionEventsis set,send()auto-starts consumption — callbacks fire even withoutawait. - OpenAI sessions use the Conversations API when available; falls back to
previous_response_idfor Bedrock. - Anthropic sessions are server-managed;
createSession()creates a real session,endSession()is a no-op. Prior assistant rows inmessagesare packed into[Context]text on the next user turn — a recovery workaround, not native replay. ToolSourceon tool events tells you origin:{ type: 'builtin' },{ type: 'custom' }, or{ type: 'mcp', serverName }.- Zero runtime dependencies; only optional peer deps for provider SDKs.
logger?on provider config enables opt-in lifecycle logging (default silent). UseadaptPinoLogger(pino)for Pino; pass the same adapter to standalonecreateWebhookHandlerwhen webhook is wired separately.