From walkeros
Explains walkerOS transformers for event validation, enrichment, redaction, and chaining in pipelines. Covers TypeScript interface, init context, push method, and integration.
npx claudepluginhub elbwalker/walkerosThis skill uses the workspace's default tool permissions.
Transformers are middleware for **validating**, **enriching**, and **redacting**
Creates isolated Git worktrees for feature branches with prioritized directory selection, gitignore safety checks, auto project setup for Node/Python/Rust/Go, and baseline verification.
Executes implementation plans in current session by dispatching fresh subagents per independent task, with two-stage reviews: spec compliance then code quality.
Dispatches parallel agents to independently tackle 2+ tasks like separate test failures or subsystems without shared state or dependencies.
Transformers are middleware for validating, enriching, and redacting events in the walkerOS pipeline. They run in chains at configurable points between sources, collector, and destinations.
Core principle: Transformers transform events. They don't capture (sources) or deliver (destinations)—they modify events in-flight.
| Use Case | Purpose | Example |
|---|---|---|
| Validate | Ensure events match schema contracts | JSON Schema validation |
| Enrich | Add server-side data to events | User segments, geo data |
| Redact | Remove sensitive data before destinations | Strip PII, anonymize IPs |
See packages/core/src/types/transformer.ts for canonical interface.
Transformers use a context-based initialization pattern:
import type { Transformer } from '@walkeros/core';
export const transformerMyTransformer: Transformer.Init<Types> = (context) => {
const { config = {}, env, logger, id } = context;
// Apply defaults inline — flow.json is developer-controlled, so no
// runtime validation. Shape checks live in ./schemas and are used by
// `walkeros validate` and dev tooling, never at runtime.
const userSettings = config.settings || {};
const settings = {
...userSettings,
// example default: threshold: userSettings.threshold ?? 100,
};
return {
push(event, pushContext) {
// Process event
return { event };
},
};
};
Init Context contains:
| Property | Type | Purpose |
|---|---|---|
config | Transformer.Config | Settings, mapping, next chain |
env | Types['env'] | Environment deps (stores via $store:) |
logger | Logger | Logging functions |
id | string | Transformer identifier |
collector | Collector.Instance | Reference to collector |
ingest | Ingest (optional) | Request metadata from source |
| Method | Purpose | Required |
|---|---|---|
push | Process event, return modified/false | Required |
init | One-time initialization | Optional |
destroy | Cleanup resources | Optional |
The push function controls event flow:
| Return | Behavior |
|---|---|
{ event } | Continue chain with modified event |
void | Continue chain, event unchanged |
false | Stop chain, event dropped |
{ event, next } | Redirect chain to a different transformer (fan-out) |
{ event, respond } | Continue chain with wrapped respond function |
push(event, context) {
if (!event.data?.id) {
context.logger.error('Missing required id');
return false; // Stop chain
}
event.data.enrichedAt = Date.now();
return { event }; // Continue with modified event
}
For simple transformations without external packages, use inline code with the
$code: string prefix in JSON configs. The $code: prefix tells the CLI
bundler to parse the following string as executable JavaScript:
{
"transformers": {
"enrich": {
"code": {
"push": "$code:(event) => { event.data.enrichedAt = Date.now(); return { event }; }"
},
"next": "validate"
}
}
}
Inline code structure:
| Property | Purpose |
|---|---|
code.init | Code run once during initialization |
code.push | Code run for each event |
Push code has access to:
event - The event being processedcontext - Push context with logger, config, etc.Return values in push code:
{ event } to continue chain with modified eventundefined to pass event unchangedfalse to drop event from chainExample: Filtering internal events
{
"transformers": {
"filter": {
"code": {
"push": "$code:(event) => { if (event.name.startsWith('internal_')) return false; return { event }; }"
}
}
}
}
Mixing inline and package transformers:
{
"transformers": {
"addTimestamp": {
"code": {
"push": "$code:(event) => { event.data.processedAt = new Date().toISOString(); return { event }; }"
},
"next": "validate"
},
"validate": {
"package": "@walkeros/transformer-validator"
}
}
}
Transformers run at two points in the pipeline:
Source → [Pre-Transformers] → Collector → [Post-Transformers] → Destination
(source.next) (destination.before)
Runs after source captures event, before collector enrichment:
sources: {
browser: {
code: sourceBrowser,
next: 'validate' // First transformer in pre-chain
}
}
Runs after collector enrichment, before destination receives event:
destinations: {
gtag: {
code: destinationGtag,
before: 'redact' // First transformer in post-chain
}
}
Transformers link together via next:
transformers: {
validate: {
code: transformerValidator,
config: { next: 'enrich' } // Chain to next transformer
},
enrich: {
code: transformerEnrich,
config: { next: 'redact' }
},
redact: {
code: transformerRedact
// No next = end of chain
}
}
Transformers can redirect events to different chains using the branch()
factory from @walkeros/core:
import { branch } from '@walkeros/core';
push(event, context) {
return branch(event, 'parser'); // Single target
return branch(event, ['a', 'b']); // Fan-out to multiple
}
Conditional routing is built into next/before properties using Route[] —
no separate router transformer needed:
"next": [
{ "match": { "key": "ingest.path", "operator": "prefix", "value": "/api" }, "next": "api-handler" },
{ "match": "*", "next": "default" }
]
Routes are evaluated in order — first match wins. If no route matches and there's no wildcard, the event passes through unchanged.
walkChain() uses a visited set to detect circular references. If a cycle is
found, the loop is silently broken and the chain ends. If next points to a
non-existent transformer, the chain also ends without error.
See walkeros-understanding-flow for the full connection rules between sources, transformers, and destinations.
The push function receives a context with event metadata:
| Property | Purpose |
|---|---|
config | Transformer configuration |
env | Environment dependencies |
logger | Scoped logger for output |
id | Transformer identifier |
collector | Access to collector instance |
ingest | Request metadata from source |
push(event, context) {
const { logger, id, ingest } = context;
logger.debug('Processing', { transformer: id, event: event.name });
// Access request metadata if available
if (ingest?.ip) {
event.data = { ...event.data, clientIp: ingest.ip };
}
return { event };
}
Transformers can customize HTTP responses by calling
context.env.respond?.({ body, status?, headers? }). This is useful for
validation transformers that reject events with custom error responses, or
transformers that short-circuit the pipeline. First call wins (idempotent). The
respond function is optional — only present when the source provides one.
| Path | Description |
|---|---|
packages/transformers/ | Transformer packages |
packages/transformers/validator/ | Validator transformer |
Source Files:
Package READMEs:
Documentation: