From walkeros
Explains walkerOS Source→Collector→Destination event flow, composability, push interfaces, startFlow, and ingest context for designing modular pipelines.
npx claudepluginhub elbwalker/walkerosThis skill uses the workspace's default tool permissions.
walkerOS follows a **Source → Collector → Destination(s)** architecture for
Creates isolated Git worktrees for feature branches with prioritized directory selection, gitignore safety checks, auto project setup for Node/Python/Rust/Go, and baseline verification.
Executes implementation plans in current session by dispatching fresh subagents per independent task, with two-stage reviews: spec compliance then code quality.
Dispatches parallel agents to independently tackle 2+ tasks like separate test failures or subsystems without shared state or dependencies.
walkerOS follows a Source → Collector → Destination(s) architecture for composable, modular event processing.
Core principle: Separation of concerns. Each component has one job. Components are composable and replaceable.
[Source.before] → Sources → [Source.next] → Collector → [Dest.before] → Destinations → [Dest.next]
(Preprocessing) (Capture) (Pre-chain) (Processing) (Post-chain) (Delivery) (Post-push)
Consent-exempt: Post-consent:
- Decode - Validation - Event creation - Validation - Push - Audit logging
- Validate format - Enrichment - Enrichment - Enrichment - Send - Response parsing
- Authenticate - Redaction - Consent check - Routing - Store - Webhooks
A Flow combines components. You can:
See packages/core/src/types/flow.ts for the canonical interface.
// Conceptual structure (see source for full type)
interface Flow {
stores?: Record<string, Store>;
sources?: Record<string, Source>;
transformers?: Record<string, Transformer>;
destinations?: Record<string, Destination>;
collector?: Collector.InitConfig;
}
All components communicate via push functions:
| Component | Push Signature | Purpose |
|---|---|---|
| Source | push(input) → events | Capture external data |
| Collector | push(event) → void | Process and route |
| Destination | push(event, context) → void | Transform and deliver |
The elb() function is an alias for collector.push - used for component
wiring.
See packages/collector/src/flow.ts for
the startFlow function.
import { startFlow } from '@walkeros/collector';
const { collector, elb } = await startFlow({
stores: {
/* key-value storage, init first, destroy last */
},
sources: {
/* ... */
},
transformers: {
/* ... */
},
destinations: {
/* ... */
},
});
walkerOS uses a two-layer data model:
Any step can read and write arbitrary keys on ingest. The runtime manages
_meta:
_meta.hops — increments per step (safety valve at 256)_meta.path — ordered list of step IDs visited (path[0] = source ID)// In a transformer
push: async (event, context) => {
context.ingest.botScore = 0.95; // Write freely
context.ingest.geo = { country: 'DE' };
console.log(context.ingest._meta.path); // ['express', 'validate', 'enrich']
return { event };
};
Ingest is cloned per destination to prevent cross-contamination in parallel
processing. After a destination push, the response is available at
ingest._response.
Stores are the 4th component type — passive key-value infrastructure that other
components consume via env.
$store:storeId in env values (bundled mode) or passed
directly as store instances (integrated mode)next or before. They don't participate in
the event pipeline. Components access them through their env.@walkeros/store-memory (sync, LRU),
@walkeros/server-store-fs (async, filesystem), @walkeros/server-store-s3
(async, S3-compatible){
"stores": {
"data": { "package": "@walkeros/store-memory" }
},
"transformers": {
"fingerprint": {
"package": "@walkeros/server-transformer-fingerprint",
"env": { "store": "$store:data" }
}
}
}
See walkeros-understanding-stores for the full store interface and lifecycle.
| Concern | Handled By | NOT Handled By |
|---|---|---|
| Event capture | Sources | Collector, Destinations |
| Event structure | Event model | Components |
| Consent checking | Collector | Sources, Destinations |
| Transformation | Mapping system | Raw push calls |
| Delivery | Destinations | Sources, Collector |
Transformers run at two points in the pipeline, configured via next and
before:
Runs after source captures event, before collector processing:
Bundled mode (flow.json):
{
"sources": {
"browser": {
"package": "@walkeros/web-source-browser",
"next": "validate"
}
},
"transformers": {
"validate": {
"package": "@walkeros/transformer-validator",
"next": "enrich"
},
"enrich": {
"package": "@walkeros/transformer-enricher"
}
}
}
Integrated mode (TypeScript):
sources: {
browser: {
code: sourceBrowser,
next: 'validate'
}
},
transformers: {
validate: {
code: transformerValidator,
config: { next: 'enrich' }
},
enrich: {
code: transformerEnrich
}
}
Note: In flow.json, next is at the reference level. The CLI bundler
automatically moves it into config.next for runtime - you don't need to handle
this yourself.
Each transformer can have its own before chain that runs before its push
function:
{
"transformers": {
"enrich": {
"before": "lookup",
"package": "@walkeros/transformer-enricher"
},
"lookup": {
"package": "@walkeros/transformer-lookup"
}
}
}
Runs after collector enrichment, before destination receives event:
Bundled mode (flow.json):
{
"destinations": {
"gtag": {
"package": "@walkeros/web-destination-gtag",
"before": "redact"
}
},
"transformers": {
"redact": {
"package": "@walkeros/transformer-redact"
}
}
}
Integrated mode (TypeScript):
destinations: {
gtag: {
code: destinationGtag,
before: 'redact'
}
},
transformers: {
redact: {
code: transformerRedact
}
}
destination.next)Runs after destination push completes. The push response is available at
context.ingest._response:
Bundled mode (flow.json):
{
"destinations": {
"api": {
"package": "@walkeros/server-destination-api",
"next": "auditLog"
}
},
"transformers": {
"auditLog": {
"package": "@walkeros/transformer-audit"
}
}
}
Integrated mode (TypeScript):
destinations: {
api: {
code: destinationApi,
next: 'auditLog'
}
},
transformers: {
auditLog: {
code: transformerAudit
}
}
source.before → consent-exempt preprocessing chainsource.next → starts pre-collector chaintransformer.before → pre-transform enrichment chaintransformer.next (flow.json) or transformer.config.next (runtime) → links
transformersdestination.before → starts post-collector chain per destinationdestination.next → post-push processing chainEach step in a flow (source, transformer, destination) can ship step
examples -- structured { in, out } pairs that define expected input/output
behavior.
Steps sit at boundaries between arbitrary formats and walkerOS events:
in (HTTP request, DOM event) → walkerOS event outin → walkerOS event out (or false)in → arbitrary out (vendor API call)See using-step-examples for the full ASCII diagram and detailed explanation.
{
"destinations": {
"gtag": {
"package": "@walkeros/web-destination-gtag",
"config": { "measurementId": "G-XXXXXX" },
"examples": {
"purchase": {
"in": {
"name": "order complete",
"data": { "id": "ORD-123", "total": 149.97 }
},
"out": [
"event",
"purchase",
{ "transaction_id": "ORD-123", "value": 149.97 }
]
}
}
}
}
}
Step examples enable it.each testing, CLI simulation with --example, and
deep validation with --deep. See
using-step-examples for the complete
lifecycle.
This section defines which components can connect to which, and how chains are resolved at runtime. Use it as the canonical reference for building flow graphs, validating configurations, and rendering UI visualizations.
| From | To | Via Field | Valid? |
|---|---|---|---|
| Source | Transformer | source.before | Yes (consent-exempt) |
| Source | Transformer | source.next | Yes (pre-collector) |
| Source | Collector | (implicit, no next) | Yes |
| Source | Source | — | No |
| Source | Destination | — | No |
| Transformer | Transformer | transformer.before | Yes (pre-transform) |
| Transformer | Transformer | transformer.next | Yes (chain continues) |
| Transformer | Collector | (implicit, pre-chain ends) | Yes |
| Transformer | Destination | (implicit, post-chain ends) | Yes |
| Collector | Destination | (implicit, no before) | Yes |
| Collector | Transformer | destination.before | Yes (post-chain) |
| Destination | Transformer | destination.next | Yes (post-push) |
| Collector | Source | — | No |
source.next)source.next: "transformerId" or source.next: ["t1", "t2"]transformer.next: "nextId" walks forward; array stops walkingnext = source connects directly to collectordestination.before)destination.before: "transformerId" or
destination.before: ["t1", "t2"]transformer.next chain-walking logic as pre-chainsbefore = collector connects directly to destinationwalkChain)See packages/collector/src/transformer.ts for the implementation.
transformer.next links until chain endsnext inside chain: appends array elements and stops walkingThe next and before properties support conditional routing via Route[] —
an array of { match, next } objects evaluated against ingest data:
"next": [
{ "match": { "key": "ingest.path", "operator": "prefix", "value": "/api" }, "next": "api-handler" },
{ "match": "*", "next": "default" }
]
source.before, source.next,
transformer.before, transformer.next, destination.before, and
destination.nextcompileNext() and resolveNext()A single transformer can appear in both pre-chains (source.next) and
post-chains (destination.before). The same transformer pool is shared; role
depends on which chain references it.
require)source.config.require: ["consent"] — source deferred until "consent" event
firesdestination.config.require: ["user"] — destination deferred until "user"
event firessource.config.mapping and source.config.consent —
applied before pre-chain; blocks event entirelydestination.config.mapping and
destination.config.consent — applied after post-chain; skips only that
destination, queues denied eventsdestination.before chains: intentional (e.g., shared
validator for monitoring)Package READMEs:
Source Files:
Documentation: