name: walkeros-understanding-sources description: Use when working with walkerOS sources, understanding event capture, or learning about the push interface. Covers browser, dataLayer, and server source patterns.
Understanding walkerOS Sources
Overview
Sources capture events from the external world (browser DOM, dataLayer, HTTP requests, cloud functions) and feed them to the collector.
Core principle: Sources capture. They don't process or deliver—that's collector and destinations.
Source Interface
See packages/core/src/types/source.ts for canonical interface.
Init Function (Context Pattern)
Sources use a context-based initialization pattern:
import type { Source } from '@walkeros/core';
export const sourceMySource: Source.Init<Types> = async (context) => {
const { config = {}, env, logger, id } = context;
// ...
};
Context contains:
| Property | Type | Purpose |
|---|---|---|
config |
Source.Config<T> |
Settings, mapping, options |
env |
Types['env'] |
Environment (push, logger) |
logger |
Logger |
Logging functions |
id |
string |
Source identifier |
collector |
Collector.Instance |
Reference to collector |
withScope |
(raw, respond, body) => Promise<R> |
Bind ingest + respond to a single scope (server sources) |
Push Method
| Method | Purpose |
|---|---|
push(input) |
Receive external input, emit events |
Init Method
init?: () => void | Promise<void> — Optional eager-startup hook on the
returned Source.Instance. The factory must be side-effect-free: build the
instance and return it. The collector calls init() on every source eagerly
after all factories register, regardless of config.require. Use init for
work that previously sat in the factory body: draining a pre-init window queue
(e.g., window.elbLayer), attaching DOM listeners, opening sockets,
intercepting window.dataLayer. After init runs the collector flips
Source.Config.init to true.
queueOn Buffer
queueOn?: Array<{ type: On.Types; data: unknown }> — Optional buffer on the
Source.Instance for lifecycle events delivered before the source is
started (started ≡ config.init === true && !config.require?.length). The
collector pushes { type, data } here when it would otherwise call
source.on(type, data). Once the source becomes started, the collector replays
each entry via source.on(...) and clears the queue.
Destroy Method
destroy?: DestroyFn — Optional cleanup method. Called during
command('shutdown'). Use to close HTTP servers, timers, or connections.
Receives { id, config, env, logger }.
Source Lifecycle
The collector and every source agree on three lifecycle markers, all on
Source.Config:
| Field | Purpose |
|---|---|
init?: boolean |
Set by the collector to true after Instance.init() resolves. Authors do not write to it. Reflects "init has run", not "is started". |
require?: string[] |
Author-supplied timing hint. Lists collector state a source needs before its first on() delivery. Satisfied by the collector's current recorded state (level), not only by a future event — so order does not matter: the gate clears whether the required state was recorded before or after this source registered. It is not a correctness dependency: a source reacts to state correctly whether or not it declares require. |
disabled?: boolean |
Hard skip — no factory invocation, no init, no event capture. |
The flow is:
- Register — collector invokes the factory. Factory returns a fresh
Source.Instancewith no side effects. - Init pass — collector calls
Instance.init()on every registered source, then setsconfig.init = true. - Lifecycle delivery — for each collector event (
consent,user,session,run, …) the collector decrements every source'srequirein place AND reconciles every still-pending source/destination against current state, so a gate also clears from state that was already recorded. If a source is started, it callssource.on(type, data)directly. Otherwise it pushes{ type, data }intoInstance.queueOn. - Replay — when a source becomes started (require empties), the collector
replays its
queueOnviasource.on(...)and clears the queue.
require therefore gates the timing of on() delivery, not code execution
and not correctness. Instance.init() always runs eagerly. There is no
collector.pending.sources map: per-source state lives entirely on
Source.Instance and Source.Config.
Exactly-once state delivery is a collector guarantee
State commands (consent, user, globals, custom) are recorded by the
collector immediately, even before run, and delivered to each source's on
handler exactly once per change. The collector tracks what each subscriber has
already received, so re-running or re-registering never double-fires a state
reaction. Sources should not hand-roll their own deduplication for state
deliveries: the collector enforces exactly-once, and delivery is
order-independent (it does not depend on source init order or on whether the
state arrived before or after run).
This mirrors the destination model: Destination.Instance.init handles one-time
bootstrap, Destination.Config.init is the collector-managed "init has run"
flag, and Destination.Config.require gates event delivery the same way. See
walkeros-understanding-destinations.
Push Signatures by Type
| Source Type | Signature | Example |
|---|---|---|
| Cloud Function | push(req, res) → Promise<void> |
HTTP handler |
| Browser | push(event, data) → Promise<void> |
DOM events |
| DataLayer | push(event, data) → Promise<void> |
GTM-style |
Key insight: Source push IS the handler. No wrappers needed.
// Direct deployment
http('handler', source.push);
When a test or integration code needs to invoke a source's push through the
collector bag, collector.sources erases the per-source generic on read. Use
Source.getSource<T>(collector, id) to recover the narrow signature without a
cast. See the testing-strategy skill for the full pattern (symmetric helpers
exist for destinations, transformers, stores).
Source Paths
| Type | Path | Examples |
|---|---|---|
| Web | packages/web/sources/ |
browser, dataLayer |
| Server | packages/server/sources/ |
gcp |
Browser Source
The browser source captures events from DOM using data attributes.
<button data-elb="product" data-elb-product="id:P123;name:Laptop">
<span data-elbaction="click">Add to Cart</span>
</button>
See packages/web/sources/browser/ for implementation.
DataLayer Source
Captures events from a GTM-style dataLayer array.
window.dataLayer.push({
event: 'product view',
product: { id: 'P123', name: 'Laptop' },
});
See packages/web/sources/dataLayer/ for implementation.
Server Sources
Handle HTTP requests in cloud functions. Server sources use the context pattern:
import type { Source } from '@walkeros/core';
export const sourceCloudFunction: Source.Init<Types> = async (context) => {
const { config = {}, env } = context;
const { push: envPush } = env;
// Apply defaults inline — flow.json is developer-controlled, so no
// runtime validation. Shape checks live in ./schemas and are used by
// `walkeros validate` and dev tooling, never at runtime.
const userSettings = config.settings || {};
const settings = {
...userSettings,
// example default: port: userSettings.port ?? 3000,
};
const push = async (req: Request, res: Response): Promise<void> => {
// Transform HTTP request → walkerOS event
const event = transformRequest(req);
await envPush(event);
res.json({ success: true });
};
return { type: 'cloudfunction', config: { ...config, settings }, push };
};
// Direct deployment
export const handler = source.push;
See packages/server/sources/gcp/ for implementation.
Env Pattern (Dependency Injection)
Platform dependencies go through env with fallback to globals or direct
imports. This enables testing and simulation without touching globals.
// Express source: env.express ?? express (import fallback)
const expressLib = env.express ?? express;
const app = expressLib();
// Web sources: env.window ?? window (global fallback)
const win = env.window ?? window;
const doc = env.document ?? document;
Every source's Env interface extends Source.BaseEnv with optional platform
deps:
export interface Env extends Source.BaseEnv {
window?: Window & typeof globalThis; // web sources
document?: Document; // web sources
express?: typeof express; // express source
cors?: typeof cors; // express source
}
Tests inject mocks via env instead of mocking globals. See
testing-strategy.
Transformer Wiring
Sources can wire to pre-collector transformer chains via the next property:
sources: {
browser: {
code: sourceBrowser,
next: 'validate' // First transformer to run after this source
}
}
The transformer chain runs before events reach the collector. See understanding-transformers for chain details.
Per-Scope Context (server sources)
A single source factory instance handles many concurrent invocations: Express
processes overlapping requests, Lambda reuses one handler across calls, queue
consumers loop over messages. Each logical unit of work is a scope. Server
sources MUST wrap each invocation with
context.withScope(rawScope, respond, body):
const push = async (req, res) => {
const respond = createRespond((options) => {
/* wire options into res */
});
await context.withScope(req, respond, async (env) => {
await env.push(parsedData);
});
};
Inside body, env.push carries that scope's ingest (extracted from
rawScope via config.ingest mapping) and respond end to end through the
pipeline. Concurrent scopes never share ingest or respond.
Browser sources skip withScope. A browser tab is a single logical scope
for its lifetime; calling env.push directly is correct.
Response Delegation (env.respond)
When a server source passes a respond to withScope, every transformer and
destination in the pipeline can call
env.respond?.({ body, status?, headers? }) to customize the HTTP response.
First call wins (createRespond is idempotent), so the source's default
response is a no-op if a step already responded.
See @walkeros/server-source-express for the reference implementation.
createTrigger Pattern
Every source exports a createTrigger factory from its examples (dev entry)
that follows the unified Trigger.CreateFn interface:
type CreateFn<TContent, TResult> = (
config: Collector.InitConfig,
options?: unknown,
) => Promise<Trigger.Instance<TContent, TResult>>;
createTrigger simulates real-world invocations from the outside — full
blackbox, no source instance access. Each package implements it differently:
| Source | Content | Trigger type | Mechanism |
|---|---|---|---|
| Browser | HTML string | click, load... |
DOM injection + events |
| Express | HTTP req shape | POST, GET |
Real fetch() requests |
The trigger lazily calls startFlow(config) on first invocation. Tests capture
events via spy destinations. See
using-step-examples for testing
patterns.
Setup (optional)
Sources can implement an optional setup() lifecycle to provision external
resources, for example registering a webhook with a third-party provider,
creating a Pub/Sub subscription, or pre-allocating queue resources. Setup is
never invoked by the runtime, push, init, or deploy. It runs only when an
operator explicitly types walkeros setup source.<name>.
The signature is
(ctx: LifecycleContext<Config<T>, Env<T>>) => Promise<unknown>, where
LifecycleContext carries { id, config, env, logger }. Idempotency is the
package's responsibility: the framework adds no opinion. Use
resolveSetup(ctx.config.setup, DEFAULTS) from @walkeros/core to normalize
the boolean | object shape into a concrete options object.
See walkeros-create-source,
walkeros-understanding-destinations,
walkeros-understanding-stores, and
the walkeros setup CLI documentation for the authoring template and operator
workflow.
Related Skills
- walkeros-understanding-flow - How sources fit in architecture
- walkeros-understanding-events - Events that sources emit
- walkeros-create-source - Create new source
Source Files:
- packages/core/src/types/source.ts - Interface
Package READMEs:
- packages/web/sources/browser/README.md - Browser source
- packages/web/sources/dataLayer/README.md - DataLayer source
Documentation:
- Website: Sources - Overview
- Website: Browser Source - Browser docs
- Website: Create Your Own - Guide