river-job-queue

star 3

River is a PostgreSQL-based job queue used in Chronicle for background processing. Jobs are defined as Go types implementing rivertype.JobArgs. The queue handles log parsing, reparsing, and other async tasks. Workers process jobs from named queues with configurable priorities, retry policies, and deduplication.

Emyrk By Emyrk schedule Updated 2/17/2026

name: river-job-queue description: River is a PostgreSQL-based job queue used in Chronicle for background processing. Jobs are defined as Go types implementing rivertype.JobArgs. The queue handles log parsing, reparsing, and other async tasks. Workers process jobs from named queues with configurable priorities, retry policies, and deduplication.

River Job Queue

Chronicle uses River for background job processing, storing jobs in PostgreSQL.

Architecture

┌─────────────────────────────────────────────────────────────┐
│                        riverqueue.Queues                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │   Workers   │  │   Queues    │  │      River UI       │  │
│  │  (per job)  │  │  (configs)  │  │   /river endpoint   │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
└────────────────────────────┬────────────────────────────────┘
                             │
                   ┌─────────▼─────────┐
                   │     PostgreSQL     │
                   │  (river_* tables)  │
                   └───────────────────┘

Key Files

File Purpose
chronicle/riverqueue/riverqueue.go Queue initialization, web UI setup
chronicle/riverqueue/riveractions.go Job action methods (delete, etc.)
chronicle/riverqueue/panicmw.go Panic recovery middleware
chronicle/logparse.go Log parsing job (ArgsLogParse)
chronicle/reparse.go Re-parsing job (ArgsLogReparse)
chronicle/chronicle.go Service that enqueues jobs

Creating a New Job

1. Define Job Arguments

Job args must implement rivertype.JobArgs by providing Kind() and InsertOpts():

// In chronicle/myjob.go
package chronicle

import (
    "github.com/Emyrk/chronicle/chronicle/riverqueue"
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/rivertype"
)

const KindMyJob = "my-job"

type ArgsMyJob struct {
    SomeID uuid.UUID `json:"some_id"`
}

func (ArgsMyJob) Kind() string { return KindMyJob }

func (ArgsMyJob) InsertOpts() river.InsertOpts {
    return river.InsertOpts{
        Queue:       riverqueue.QueueLogParsing,  // or QueueDefault
        Priority:    riverqueue.PriorityDefault,   // 1=highest, 4=low
        MaxAttempts: 5,
        UniqueOpts: river.UniqueOpts{
            ByArgs: true,  // Dedupe by args
            ByState: []rivertype.JobState{
                rivertype.JobStateScheduled,
                rivertype.JobStatePending,
                rivertype.JobStateAvailable,
                rivertype.JobStateRunning,
                rivertype.JobStateRetryable,
            },
        },
    }
}

2. Implement the Worker

type WorkerMyJob struct {
    parent *Chronicle
    river.WorkerDefaults[ArgsMyJob]  // Provides default timeout, etc.
}

func (c *Chronicle) NewWorkerMyJob() river.Worker[ArgsMyJob] {
    return &WorkerMyJob{parent: c}
}

func (w *WorkerMyJob) Work(ctx context.Context, job *river.Job[ArgsMyJob]) error {
    // Access job args via job.Args
    id := job.Args.SomeID
    
    // Access services via w.parent
    db := w.parent.Zed
    
    // Return nil for success
    // Return error to retry (unless MaxAttempts exhausted)
    // Return river.JobCancel(err) to permanently cancel
    return nil
}

// Optional: Custom retry timing
func (w *WorkerMyJob) NextRetry(job *river.Job[ArgsMyJob]) time.Time {
    return (&river.DefaultClientRetryPolicy{}).NextRetry(job.JobRow).Add(time.Minute)
}

3. Register the Worker

In cmd/chronicled/main.go (or wherever the queue is initialized):

// Add worker to the queue before Start()
riverqueue.AddWorker(queues, chronicle.NewWorkerMyJob())

4. Enqueue Jobs

func (c *Chronicle) EnqueueMyJob(ctx context.Context, id uuid.UUID) (*rivertype.JobInsertResult, error) {
    return c.queue.Insert(ctx, ArgsMyJob{
        SomeID: id,
    }, &river.InsertOpts{
        Tags: []string{
            fmt.Sprintf("owner_%s", ownerID.String()),  // Optional filtering tags
        },
    })
}

Queue Configuration

Queues

const (
    QueueLogParsing = "log-parsing"  // Dedicated queue for log parsing
    // river.QueueDefault is also available
)

Priorities

const (
    PriorityHighest = 1
    PriorityHigh    = 2
    PriorityDefault = 3
    PriorityLow     = 4
)

Queue Worker Limits

Set in riverqueue.New() or via AddQueue():

q.AddQueue("my-queue", river.QueueConfig{MaxWorkers: 10})

Job Lifecycle

Available → Running → Completed
              │
              ├─(error)→ Retryable → Available (retry)
              │
              └─(JobCancel)→ Cancelled

Handling Failures

func (w *WorkerMyJob) Work(ctx context.Context, job *river.Job[ArgsMyJob]) error {
    // Permanent failure - won't retry
    if someUnrecoverableCondition {
        return river.JobCancel(fmt.Errorf("cannot process: %v", reason))
    }
    
    // Temporary failure - will retry per MaxAttempts
    if err := doWork(); err != nil {
        return fmt.Errorf("work failed: %w", err)
    }
    
    return nil
}

Recording Output

Jobs can store structured output visible in River UI:

func (w *WorkerMyJob) Work(ctx context.Context, job *river.Job[ArgsMyJob]) error {
    output := map[string]any{
        "processed_count": 42,
        "duration_ms":     1234,
    }
    _ = river.RecordOutput(ctx, output)
    return nil
}

Querying Jobs

// List jobs for a specific log group
func (c *Chronicle) ListLogGroupJobs(ctx context.Context, groupID uuid.UUID) (*river.JobListResult, error) {
    opts := river.NewJobListParams().
        Where(`args->>'log_group_id' = @group_id`, map[string]any{
            "group_id": groupID.String(),
        }).
        Queues(riverqueue.QueueLogParsing).
        Kinds(KindLogParse, KindLogReparse).
        OrderBy(river.JobListOrderByScheduledAt, river.SortOrderDesc)
    
    return c.queue.JobList(ctx, opts)
}

// Cancel a job
_, err := c.queue.JobCancel(ctx, jobID)

// Delete a job
_, err := c.queue.JobDelete(ctx, jobID)

River UI

River includes a built-in web UI mounted at /river:

// In riverqueue.go
func webUI(ctx context.Context, logger *slog.Logger, client *river.Client[pgx.Tx]) (http.Handler, error) {
    opts := &riverui.HandlerOpts{
        Prefix: "/river",
        // ...
    }
    srv, err := riverui.NewHandler(opts)
    // ...
}

The UI requires authentication (enforced in the handler wrapper).

Panic Recovery

The workerPanicMW middleware logs panics with stack traces before re-panicking:

func (w workerPanicMW) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
    defer func() {
        if r := recover(); r != nil && r != http.ErrAbortHandler {
            w.logger.Warn("panic serving with job (recovered)",
                slog.String("job_kind", job.Kind),
                slog.Any("panic", r),
                slog.String("stack", string(debug.Stack())),
            )
            panic(r)  // Re-panic for River to handle
        }
    }()
    return doInner(ctx)
}

Configuration Options

river.Config{
    Queues:  queues,
    Workers: workers,
    Middleware: []rivertype.Middleware{
        NewWorkerPanicMW(logger),
    },
    CompletedJobRetentionPeriod: -1,          // Keep completed jobs forever
    RescueStuckJobsAfter:        time.Hour,   // Rescue stuck jobs after 1 hour
    JobTimeout:                  time.Minute * 30,  // Default job timeout
}

Insert-Only Mode

For processes that should only enqueue jobs (not process them):

opts := riverqueue.Options{
    InsertOnly: true,  // Disables worker processing
}

Example: Log Parsing Flow

1. User uploads logs → chronicle.UploadLogs()
2. Files saved to object storage
3. Database records created in transaction
4. Job enqueued: EnqueueParseLog()
   └─ ArgsLogParse{LogID: groupID}
5. Worker picks up job
   └─ WorkerLogParse.Work()
       ├─ Download files from storage
       ├─ Sort and merge log lines
       ├─ Parse encounters
       └─ Insert parsed data to DB
6. Job marked complete with output
Install via CLI
npx skills add https://github.com/Emyrk/chronicle --skill river-job-queue
Repository Details
star Stars 3
call_split Forks 3
navigation Branch main
article Path SKILL.md
More from Creator