worker-development

star 304

Create and manage background workers, BullMQ queues, Kafka consumers, and scheduled jobs. Use when adding new workers, creating queues, defining job types, building scheduled tasks, or working with async processing.

ChatbotXIO By ChatbotXIO schedule Updated 6/14/2026

name: worker-development description: >- Create and manage background workers, BullMQ queues, Kafka consumers, and scheduled jobs. Use when adding new workers, creating queues, defining job types, building scheduled tasks, or working with async processing.

Worker Development

Architecture

Workers run as separate Node processes in apps/worker/. They consume jobs from BullMQ queues (Redis-backed) or Kafka topics.

Shared config lives in packages/worker-config/ (@chatbotx.io/worker-config).

Existing Workers

Worker Queue/Topic Entry
integration integration src/integration/worker.ts
chat chat src/chat/worker.ts
ai-agent aiAgent src/ai-agent/worker.ts
default default src/default/worker.ts
trigger trigger src/trigger/worker.ts
webhook webhook src/webhook/worker.ts
schedule (cron) src/schedule/worker.ts
sequence-scheduler Kafka src/sequence-scheduler/worker*.ts

Creating a New Queue

1. Define Queue Name

Add to packages/worker-config/src/lib/types.ts:

export const queueName = {
  // ...existing
  myQueue: "myQueue",
} as const

2. Define Job Types

Create packages/worker-config/src/queues/<name>/index.ts:

import { Queue } from "bullmq"
import { getRedisConnection, defaultJobOptions, fakeQueue } from "../../lib/connection"

export enum MyQueueJobAction {
  processItem = "processItem",
  syncData = "syncData",
}

type ProcessItemJob = {
  type: MyQueueJobAction.processItem
  data: { itemId: string; workspaceId: string }
}

type SyncDataJob = {
  type: MyQueueJobAction.syncData
  data: { source: string }
}

export type MyQueueJobData = ProcessItemJob | SyncDataJob

const NEXT_PHASE = process.env.NEXT_PHASE
export const myQueue =
  NEXT_PHASE === "phase-production-build"
    ? fakeQueue
    : new Queue(queueNames.enum.myQueue, {
        connection: getRedisConnection(),
        defaultJobOptions,
      })

3. Export from Package

Add to packages/worker-config/src/index.ts:

export * from "./queues/<name>"

Creating a New Worker

Create apps/worker/src/<domain>/worker.ts:

import { Worker, type Job } from "bullmq"
import { queueNames } from "@chatbotx.io/worker-config"
import {
  getRedisConnection,
  defaultWorkerOptions,
} from "@chatbotx.io/worker-config"
import type { MyQueueJobData } from "@chatbotx.io/worker-config"
import { MyQueueJobAction } from "@chatbotx.io/worker-config"
import { ensureBootstrapped } from "../lib/bootstrap"
import { logger } from "@chatbotx.io/logger"

const startMyWorker = async () => {
  try {
    await ensureBootstrapped()
    logger.info("My worker bootstrapped successfully")
  } catch (err) {
    logger.error(err, "Failed to bootstrap my worker")
    process.exit(1)
  }

  const worker = new Worker(
    queueNames.enum.myQueue,
    async (job: Job<MyQueueJobData>) => {
      logger.info(job.data, "Worker received job")

      switch (job.data.type) {
        case MyQueueJobAction.processItem:
          return await handleProcessItem(job.data.data)
        case MyQueueJobAction.syncData:
          return await handleSyncData(job.data.data)
        default:
          return
      }
    },
    {
      connection: getRedisConnection(),
      ...defaultWorkerOptions,
    },
  )

  worker.on("failed", (job, err) => {
    logger.error({ err, jobId: job?.id }, "Job failed")
  })

  worker.on("completed", (job) => {
    logger.info({ jobId: job.id }, "Job completed")
  })
}

startMyWorker()

Register Build Entry

Add to apps/worker/tsdown.config.ts:

entry: [
  // ...existing
  "src/<domain>/worker.ts",
]

Add Dev Script

Add to apps/worker/package.json:

{
  "scripts": {
    "worker:<domain>": "dotenv -e ../../.env -- tsx --watch src/<domain>/worker.ts"
  }
}

Include in the dev script's concurrently list if needed.

Enqueuing Jobs (Producer Side)

From builder or other apps:

import { myQueue, MyQueueJobAction } from "@chatbotx.io/worker-config"

await myQueue.add("processItem", {
  type: MyQueueJobAction.processItem,
  data: { itemId: "123", workspaceId: "456" },
})

// Bulk enqueue
await myQueue.addBulk([
  {
    name: "syncData",
    data: { type: MyQueueJobAction.syncData, data: { source: "api" } },
  },
])

Custom Job IDs & Deduplication

A custom jobId deduplicates work: a second add with an existing jobId returns the existing job instead of creating a new one (while that job is still present in the queue).

CRITICAL — jobId must NOT contain :. BullMQ uses : as its internal Redis key delimiter, so a custom id containing it throws at runtime: Error: Custom Id cannot contain :. Build ids from -/_ plus the entity ids:

// ✅ correct
const jobId = `broadcast-send-contact-${broadcastId}-${contactId}-${type}`
// ❌ runtime crash: "Custom Id cannot contain :"
const jobId = `bcast:${broadcastId}:${contactId}:${type}`

Keep ids deterministic (so retries / re-picks collapse onto the same job) and free of whitespace. The repo convention is --separated (e.g. schedule-prepare-broadcast-<id>).

Retention vs re-drive — two opposite removeOnComplete policies

  • Dedup job (one-shot per entity, e.g. per-contact send): use removeOnComplete: { age, count } (TTL retention) so the id survives long enough to dedup concurrent / retry / crash-window re-adds. Never removeOnComplete: true — it removes the id immediately and reopens the duplicate window.
  • Re-driveable job (a cron re-adds the same id every tick): use removeOnComplete: true + removeOnFail: true so the id frees after each run and the next tick can re-add it. Retention here would dedup the next tick away and stall the entity.

Never reuse an active job's jobId for a self-requeue created inside that same job — BullMQ drops the add (id already active) and the chain dies. Drive continuation from a separate cron instead.

Testing job IDs (mock pitfall)

Unit tests usually mock queue.add as vi.fn(), which accepts any args — it does not validate the jobId like real BullMQ. A wrong id format (e.g. containing :) passes a self-consistent unit test (code and test agree on the bad string) but crashes at runtime. Always assert the produced jobId against the constraint:

const jobIds = addSpy.mock.calls.map((c) => c[2].jobId)
for (const jobId of jobIds) expect(jobId).not.toContain(":")

For at least one enqueue path, prefer an integration test against real (or ioredis-mock) BullMQ to catch library-level input validation that mocks miss.

Scheduled Jobs (Cron)

The schedule worker (src/schedule/worker.ts) runs periodic tasks. Add cron-style scheduled work there or use BullMQ's repeatable jobs:

await myQueue.add(
  "syncData",
  { type: MyQueueJobAction.syncData, data: { source: "cron" } },
  { repeat: { pattern: "0 */6 * * *" } }, // every 6 hours
)

Kafka (Sequence Scheduler Pattern)

For high-throughput scenarios, the project uses Kafka:

  • Producer: createProducer from @chatbotx.io/kafka
  • Consumer: createConsumer from @chatbotx.io/kafka
  • Topics defined as constants
  • JSON serialization for payloads

Only used for sequence dispatch currently. Prefer BullMQ for standard job queues.

Worker Imports

What Import from
Queue names, job types @chatbotx.io/worker-config
Redis connection, options @chatbotx.io/worker-config
Database @chatbotx.io/database/client
Integration handlers ../services/integrations (within worker)
Logger @chatbotx.io/logger
SDK types @chatbotx.io/sdk

Logging

Import the child logger from ../../lib/logger inside apps/worker, or @chatbotx.io/logger for shared packages.

Always use err (not error) as the key for Error objects.

pino's built-in serializer is keyed on err. Using any other key (e.g. error) skips serialization, so the stack trace and error message are lost from structured logs.

// ✅ correct — stack trace preserved
logger.error({ err: error, conversationId }, "Failed to emit analytics event")

// ❌ wrong — object serialized as [Object] with no stack trace
logger.error({ error, conversationId }, "Failed to emit analytics event")

For fire-and-forget .catch() handlers on analytics/event-bus emissions, always log the error rather than swallowing it:

emit("analytics:dashboard", payload).catch((err) => {
  logger.error({ err, conversationId }, "[handler] Failed to emit analytics event")
})

Environment

  • Workers load env via dotenv -e ../../.env (root .env file)
  • Redis URL configured in packages/worker-config/src/keys.ts
  • fakeQueue stubs are used during Next.js production build to avoid Redis requirement
Install via CLI
npx skills add https://github.com/ChatbotXIO/ChatbotX --skill worker-development
Repository Details
star Stars 304
call_split Forks 33
navigation Branch main
article Path SKILL.md
More from Creator