walkeros-understanding-sources

star 340

Use when working with walkerOS sources, understanding event capture, or learning about the push interface. Covers browser, dataLayer, and server source patterns.

elbwalker By elbwalker schedule Updated 6/2/2026

name: walkeros-understanding-sources description: Use when working with walkerOS sources, understanding event capture, or learning about the push interface. Covers browser, dataLayer, and server source patterns.

Understanding walkerOS Sources

Overview

Sources capture events from the external world (browser DOM, dataLayer, HTTP requests, cloud functions) and feed them to the collector.

Core principle: Sources capture. They don't process or deliver—that's collector and destinations.

Source Interface

See packages/core/src/types/source.ts for canonical interface.

Init Function (Context Pattern)

Sources use a context-based initialization pattern:

import type { Source } from '@walkeros/core';

export const sourceMySource: Source.Init<Types> = async (context) => {
  const { config = {}, env, logger, id } = context;
  // ...
};

Context contains:

Property Type Purpose
config Source.Config<T> Settings, mapping, options
env Types['env'] Environment (push, logger)
logger Logger Logging functions
id string Source identifier
collector Collector.Instance Reference to collector
withScope (raw, respond, body) => Promise<R> Bind ingest + respond to a single scope (server sources)

Push Method

Method Purpose
push(input) Receive external input, emit events

Init Method

init?: () => void | Promise<void> — Optional eager-startup hook on the returned Source.Instance. The factory must be side-effect-free: build the instance and return it. The collector calls init() on every source eagerly after all factories register, regardless of config.require. Use init for work that previously sat in the factory body: draining a pre-init window queue (e.g., window.elbLayer), attaching DOM listeners, opening sockets, intercepting window.dataLayer. After init runs the collector flips Source.Config.init to true.

queueOn Buffer

queueOn?: Array<{ type: On.Types; data: unknown }> — Optional buffer on the Source.Instance for lifecycle events delivered before the source is started (started ≡ config.init === true && !config.require?.length). The collector pushes { type, data } here when it would otherwise call source.on(type, data). Once the source becomes started, the collector replays each entry via source.on(...) and clears the queue.

Destroy Method

destroy?: DestroyFn — Optional cleanup method. Called during command('shutdown'). Use to close HTTP servers, timers, or connections. Receives { id, config, env, logger }.

Source Lifecycle

The collector and every source agree on three lifecycle markers, all on Source.Config:

Field Purpose
init?: boolean Set by the collector to true after Instance.init() resolves. Authors do not write to it. Reflects "init has run", not "is started".
require?: string[] Author-supplied timing hint. Lists collector state a source needs before its first on() delivery. Satisfied by the collector's current recorded state (level), not only by a future event — so order does not matter: the gate clears whether the required state was recorded before or after this source registered. It is not a correctness dependency: a source reacts to state correctly whether or not it declares require.
disabled?: boolean Hard skip — no factory invocation, no init, no event capture.

The flow is:

  1. Register — collector invokes the factory. Factory returns a fresh Source.Instance with no side effects.
  2. Init pass — collector calls Instance.init() on every registered source, then sets config.init = true.
  3. Lifecycle delivery — for each collector event (consent, user, session, run, …) the collector decrements every source's require in place AND reconciles every still-pending source/destination against current state, so a gate also clears from state that was already recorded. If a source is started, it calls source.on(type, data) directly. Otherwise it pushes { type, data } into Instance.queueOn.
  4. Replay — when a source becomes started (require empties), the collector replays its queueOn via source.on(...) and clears the queue.

require therefore gates the timing of on() delivery, not code execution and not correctness. Instance.init() always runs eagerly. There is no collector.pending.sources map: per-source state lives entirely on Source.Instance and Source.Config.

Exactly-once state delivery is a collector guarantee

State commands (consent, user, globals, custom) are recorded by the collector immediately, even before run, and delivered to each source's on handler exactly once per change. The collector tracks what each subscriber has already received, so re-running or re-registering never double-fires a state reaction. Sources should not hand-roll their own deduplication for state deliveries: the collector enforces exactly-once, and delivery is order-independent (it does not depend on source init order or on whether the state arrived before or after run).

This mirrors the destination model: Destination.Instance.init handles one-time bootstrap, Destination.Config.init is the collector-managed "init has run" flag, and Destination.Config.require gates event delivery the same way. See walkeros-understanding-destinations.

Push Signatures by Type

Source Type Signature Example
Cloud Function push(req, res) → Promise<void> HTTP handler
Browser push(event, data) → Promise<void> DOM events
DataLayer push(event, data) → Promise<void> GTM-style

Key insight: Source push IS the handler. No wrappers needed.

// Direct deployment
http('handler', source.push);

When a test or integration code needs to invoke a source's push through the collector bag, collector.sources erases the per-source generic on read. Use Source.getSource<T>(collector, id) to recover the narrow signature without a cast. See the testing-strategy skill for the full pattern (symmetric helpers exist for destinations, transformers, stores).

Source Paths

Type Path Examples
Web packages/web/sources/ browser, dataLayer
Server packages/server/sources/ gcp

Browser Source

The browser source captures events from DOM using data attributes.

<button data-elb="product" data-elb-product="id:P123;name:Laptop">
  <span data-elbaction="click">Add to Cart</span>
</button>

See packages/web/sources/browser/ for implementation.

DataLayer Source

Captures events from a GTM-style dataLayer array.

window.dataLayer.push({
  event: 'product view',
  product: { id: 'P123', name: 'Laptop' },
});

See packages/web/sources/dataLayer/ for implementation.

Server Sources

Handle HTTP requests in cloud functions. Server sources use the context pattern:

import type { Source } from '@walkeros/core';

export const sourceCloudFunction: Source.Init<Types> = async (context) => {
  const { config = {}, env } = context;
  const { push: envPush } = env;

  // Apply defaults inline — flow.json is developer-controlled, so no
  // runtime validation. Shape checks live in ./schemas and are used by
  // `walkeros validate` and dev tooling, never at runtime.
  const userSettings = config.settings || {};
  const settings = {
    ...userSettings,
    // example default: port: userSettings.port ?? 3000,
  };

  const push = async (req: Request, res: Response): Promise<void> => {
    // Transform HTTP request → walkerOS event
    const event = transformRequest(req);
    await envPush(event);
    res.json({ success: true });
  };

  return { type: 'cloudfunction', config: { ...config, settings }, push };
};

// Direct deployment
export const handler = source.push;

See packages/server/sources/gcp/ for implementation.

Env Pattern (Dependency Injection)

Platform dependencies go through env with fallback to globals or direct imports. This enables testing and simulation without touching globals.

// Express source: env.express ?? express (import fallback)
const expressLib = env.express ?? express;
const app = expressLib();

// Web sources: env.window ?? window (global fallback)
const win = env.window ?? window;
const doc = env.document ?? document;

Every source's Env interface extends Source.BaseEnv with optional platform deps:

export interface Env extends Source.BaseEnv {
  window?: Window & typeof globalThis; // web sources
  document?: Document; // web sources
  express?: typeof express; // express source
  cors?: typeof cors; // express source
}

Tests inject mocks via env instead of mocking globals. See testing-strategy.

Transformer Wiring

Sources can wire to pre-collector transformer chains via the next property:

sources: {
  browser: {
    code: sourceBrowser,
    next: 'validate'  // First transformer to run after this source
  }
}

The transformer chain runs before events reach the collector. See understanding-transformers for chain details.

Per-Scope Context (server sources)

A single source factory instance handles many concurrent invocations: Express processes overlapping requests, Lambda reuses one handler across calls, queue consumers loop over messages. Each logical unit of work is a scope. Server sources MUST wrap each invocation with context.withScope(rawScope, respond, body):

const push = async (req, res) => {
  const respond = createRespond((options) => {
    /* wire options into res */
  });

  await context.withScope(req, respond, async (env) => {
    await env.push(parsedData);
  });
};

Inside body, env.push carries that scope's ingest (extracted from rawScope via config.ingest mapping) and respond end to end through the pipeline. Concurrent scopes never share ingest or respond.

Browser sources skip withScope. A browser tab is a single logical scope for its lifetime; calling env.push directly is correct.

Response Delegation (env.respond)

When a server source passes a respond to withScope, every transformer and destination in the pipeline can call env.respond?.({ body, status?, headers? }) to customize the HTTP response. First call wins (createRespond is idempotent), so the source's default response is a no-op if a step already responded.

See @walkeros/server-source-express for the reference implementation.

createTrigger Pattern

Every source exports a createTrigger factory from its examples (dev entry) that follows the unified Trigger.CreateFn interface:

type CreateFn<TContent, TResult> = (
  config: Collector.InitConfig,
  options?: unknown,
) => Promise<Trigger.Instance<TContent, TResult>>;

createTrigger simulates real-world invocations from the outside — full blackbox, no source instance access. Each package implements it differently:

Source Content Trigger type Mechanism
Browser HTML string click, load... DOM injection + events
Express HTTP req shape POST, GET Real fetch() requests

The trigger lazily calls startFlow(config) on first invocation. Tests capture events via spy destinations. See using-step-examples for testing patterns.

Setup (optional)

Sources can implement an optional setup() lifecycle to provision external resources, for example registering a webhook with a third-party provider, creating a Pub/Sub subscription, or pre-allocating queue resources. Setup is never invoked by the runtime, push, init, or deploy. It runs only when an operator explicitly types walkeros setup source.<name>.

The signature is (ctx: LifecycleContext<Config<T>, Env<T>>) => Promise<unknown>, where LifecycleContext carries { id, config, env, logger }. Idempotency is the package's responsibility: the framework adds no opinion. Use resolveSetup(ctx.config.setup, DEFAULTS) from @walkeros/core to normalize the boolean | object shape into a concrete options object.

See walkeros-create-source, walkeros-understanding-destinations, walkeros-understanding-stores, and the walkeros setup CLI documentation for the authoring template and operator workflow.

Related Skills

Source Files:

Package READMEs:

Documentation:

Install via CLI
npx skills add https://github.com/elbwalker/walkerOS --skill walkeros-understanding-sources
Repository Details
star Stars 340
call_split Forks 20
navigation Branch main
article Path SKILL.md
More from Creator