From typescript-services
Enforces RxJS patterns for reactive programming including observable creation, operators, memory management, error handling, and testing. Use when working with observables, streams, async operations, or reactive programming patterns.
npx claudepluginhub andercore-labs/claudes-kitchen --plugin typescript-servicesThis skill uses the workspace's default tool permissions.
```typescript
Implements structured self-debugging workflow for AI agent failures: capture errors, diagnose patterns like loops or context overflow, apply contained recoveries, and generate introspection reports.
Monitors deployed URLs for regressions in HTTP status, console errors, performance metrics, content, network, and APIs after deploys, merges, or upgrades.
Provides React and Next.js patterns for component composition, compound components, state management, data fetching, performance optimization, forms, routing, and accessible UIs.
of(1, 2, 3)
from(promise)
defer(() => operation)
interval(1000)
fromEvent(element, 'click')
pipe(
map(x => x * 2),
filter(x => x > 0),
switchMap(x => http.get(x)),
catchError(err => of(null)),
takeUntil(destroy$)
)
subject$ = new Subject<T>()
behavior$ = new BehaviorSubject<T>(initial)
replay$ = new ReplaySubject<T>(bufferSize)
pipe(takeUntil(destroy$))
subscription.unsubscribe()
| Trigger | Pattern |
|---|---|
| Async operations | Observable creation + operators |
| Stream transformation | map, filter, scan |
| HTTP requests | switchMap, mergeMap, concatMap |
| Event handling | fromEvent + operators |
| State management | BehaviorSubject + scan |
| Testing async | TestScheduler + marble syntax |
| Pattern | Use Case | Example |
|---|---|---|
of(...values) | Static values | of(1, 2, 3) |
from(array|promise) | Convert existing | from(Promise.resolve(data)) |
defer(() => obs) | Lazy creation | defer(() => http.get(url)) |
interval(ms) | Periodic events | interval(1000) |
fromEvent(target, event) | DOM events | fromEvent(button, 'click') |
throwError(() => error) | Error emission | throwError(() => new Error('fail')) |
animationFrames() | Animation timing | animationFrames().pipe(map(({elapsed}) => elapsed)) |
Promise conversion:
| Pattern | Use Case | Example |
|---|---|---|
firstValueFrom(obs) | Observable → Promise (first) | await firstValueFrom(http.get(url)) |
lastValueFrom(obs) | Observable → Promise (last) | await lastValueFrom(events$) |
Cold vs Hot:
cold$ = defer(() => http.get(url))
hot$ = cold$.pipe(share())
hot$ = cold$.pipe(share({
resetOnError: true,
resetOnComplete: false,
resetOnRefCountZero: true
}))
hot$ = cold$.pipe(
connect(shared$ => merge(
shared$.pipe(map(x => x)),
shared$.pipe(filter(x => x > 0))
))
)
| Operator | Action | Example |
|---|---|---|
map(fn) | Transform each value | map(x => x * 2) |
map(fn) | Extract property | map(x => x.name) |
scan(fn, seed) | Accumulate state | scan((acc, x) => acc + x, 0) |
switchMap(fn) | Cancel previous, emit latest | switchMap(id => http.get(id)) |
mergeMap(fn) | Run all concurrently | mergeMap(id => http.get(id)) |
concatMap(fn) | Queue sequentially | concatMap(id => http.post(id)) |
exhaustMap(fn) | Ignore while running | exhaustMap(() => http.post()) |
| Operator | Action | Example |
|---|---|---|
filter(pred) | Conditional pass | filter(x => x > 0) |
take(n) | First n values | take(1) |
takeUntil(notifier) | Until notifier emits | takeUntil(destroy$) |
takeWhile(pred) | While condition true | takeWhile(x => x < 100) |
distinctUntilChanged() | Skip duplicates | distinctUntilChanged() |
debounceTime(ms) | Wait for silence | debounceTime(300) |
throttleTime(ms) | Rate limit | throttleTime(1000) |
| Operator | Action | Example |
|---|---|---|
combineLatest([a, b]) | Latest from each | combineLatest([obs1$, obs2$]) |
merge(a, b) | Merge emissions | merge(click$, hover$) |
concat(a, b) | Sequential | concat(first$, second$) |
zip([a, b]) | Pair by index | zip([obs1$, obs2$]) |
| Operator | Action | Example |
|---|---|---|
tap(fn) | Side effect | tap(x => console.log(x)) |
delay(ms) | Delay emission | delay(1000) |
finalize(fn) | Run on complete/error | finalize(() => cleanup()) |
share(config?) | Multicast | share() or share({ resetOnError: true }) |
shareReplay(n) | Replay n values | shareReplay(1) |
connect(fn) | Multicast with selector | connect(shared$ => merge(...)) |
Pattern: takeUntil + destroy Subject
destroy$ = new Subject<void>()
obs$.pipe(
takeUntil(this.destroy$)
).subscribe()
ngOnDestroy() {
this.destroy$.next()
this.destroy$.complete()
}
Manual unsubscribe:
subscription = obs$.subscribe()
subscription.unsubscribe()
Automatic cleanup:
obs$.pipe(
take(1),
first(),
takeWhile(x => x)
).subscribe()
| Pattern | Action | Example |
|---|---|---|
catchError(fn) | Recover from error | catchError(() => of(fallback)) |
retry(n) | Retry n times | retry(3) |
retryWhen(fn) | Conditional retry | retryWhen(errors => errors.pipe(delay(1000))) |
throwError(fn) | Emit error | throwError(() => new Error()) |
Error recovery:
http.get(url).pipe(
retry(3),
catchError(err => {
console.error(err)
return of(null)
})
)
Error propagation:
obs$.pipe(
switchMap(x =>
x > 0
? of(x)
: throwError(() => new Error('Invalid'))
),
catchError(err => of(-1))
)
| Type | Behavior | Use Case |
|---|---|---|
Subject<T> | No initial value, no replay | Event bus |
BehaviorSubject<T>(init) | Has current value, replays 1 | State store |
ReplaySubject<T>(n) | Replays last n values | Message history |
AsyncSubject<T> | Emits last value on complete | Single async result |
State management:
state$ = new BehaviorSubject<State>(initial)
state$.subscribe(state => render(state))
state$.next(newState)
const current = state$.value
| Scenario | Operator | Reason |
|---|---|---|
| HTTP search (cancel previous) | switchMap | Cancels outdated requests |
| HTTP parallel requests | mergeMap | Runs all concurrently |
| HTTP sequential writes | concatMap | Maintains order |
| Button click (ignore while pending) | exhaustMap | Prevents double-submit |
| Transform value | map | Synchronous transform |
| Accumulate state | scan | Maintains accumulated value |
Decision tree:
Need to flatten inner observable?
├─ Cancel previous? → switchMap
├─ Run all parallel? → mergeMap
├─ Run sequential? → concatMap
└─ Ignore while active? → exhaustMap
Simple transform? → map
Need state accumulation? → scan
Marble syntax:
'-' : 1ms time progression
'|' : Completion
'#' : Error
'a' : Emission (value defined in values object)
'()' : Synchronous grouping
'^' : Subscription point
TestScheduler:
testScheduler.run(({ cold, hot, expectObservable }) => {
const input$ = cold(' -a-b-c|', { a: 1, b: 2, c: 3 })
const expected = ' -x-y-z|'
const values = { x: 2, y: 4, z: 6 }
const result$ = input$.pipe(map(x => x * 2))
expectObservable(result$).toBe(expected, values)
})
Hot observable:
const hot$ = hot('--a--b--c|', { a: 1, b: 2, c: 3 })
const expected = '--x--y--z|'
Subscription timing:
const input$ = cold(' -a-b-c|')
const subscription = '^----!'
const expected = ' -a-b-'
expectObservable(input$, subscription).toBe(expected)
See testing-guide.md for advanced marble patterns.
| Anti-Pattern | Issue | Fix |
|---|---|---|
Nested .subscribe() | ✗ Callback hell + no cancellation | ✓ switchMap | mergeMap |
| No cleanup | ✗ Memory leak | ✓ takeUntil(destroy$) |
| Missing error handler | ✗ Uncaught errors | ✓ catchError |
| Values outside stream | ✗ Breaks reactive flow | ✓ combineLatest | withLatestFrom |
| Blocking/imperative code | ✗ Breaks composition | ✓ Keep reactive |
| Early subscription | ✗ Returns void → no composition | ✓ Return Observable |
| Late subscription | ✗ Values arrive before subscribe | ✓ shareReplay() |
| Subject abuse | ✗ Manual .next() vs operators | ✓ Use shareReplay() |
| Mixing async/await | ✗ Loses Observable capabilities | ✓ firstValueFrom() sparingly |
| Manual error handling | ✗ Using try/catch | ✓ catchError() |
| Exposing Subjects | ✗ External .next() access | ✓ private subject$ + public obs$ |
| Complex chains | ✗ 10+ operators → hard debug | ✓ Break into named intermediates |
| No completion | ✗ Infinite streams | ✓ take | takeUntil |
See examples.md for detailed anti-pattern examples and solutions.
MANDATORY: Run after generating RxJS code.
| Check | Evidence Required | Pass | Fail |
|---|---|---|---|
| Observable creation | Recipe pattern used | of(), from(), defer() | new Observable() |
| Subscription cleanup | Cleanup present | takeUntil(destroy$), .unsubscribe() | No cleanup mechanism |
| Error handling | catchError present | catchError() in pipe | try/catch |
| Operator choice | Correct flattening operator | switchMap for HTTP cancel | Nested .subscribe() |
| Memory leak | No leaked subscriptions | All subs cleaned up | Orphaned subscriptions |
| Reactive flow | No side-channel values | combineLatest/withLatestFrom | let value; obs$.subscribe(x => value = x) |
| Composition | Returns Observable | return obs$ | subscribe() in function returning void |
| Subject encapsulation | Private subjects | private subject$ | public subject$ |
| Stream completion | Proper completion | take/takeUntil | Infinite streams without completion |
| Anti-patterns | None present | No violations | Any anti-pattern from table |
Validation method:
1. Read generated code
2. Search for anti-patterns:
- Nested .subscribe()
- Missing takeUntil/unsubscribe
- Missing catchError / using try/catch
- Wrong operator choice
- Values passed outside stream (let x; obs$.subscribe(v => x = v))
- Blocking/imperative code in pipes
- Early subscription (subscribe in void function)
- Subject abuse (manual .next() instead of operators)
- Exposed Subjects (public subject$)
- Missing completion (infinite streams)
3. Cite line numbers for violations
4. Report Pass/Fail for each check
Pass example:
✓ Observable creation: `from(promise)` [line 15]
✓ Cleanup: `takeUntil(destroy$)` [line 23]
✓ Error handling: `catchError` [line 18]
✓ Operator: `switchMap` for HTTP [line 20]
✓ No leaks: All cleaned [lines 23, 45, 67]
✓ Anti-patterns: None
RESULT: PASS
Fail example:
✓ Creation: `of()` [line 10]
✗ Cleanup missing [line 30] → Add takeUntil(destroy$)
✗ Nested subscribe [lines 35-40] → Use switchMap
✗ No catchError [line 50] → Add catchError(() => of(null))
✗ Values outside stream [line 25] → Use combineLatest
✗ Public subject$ [line 15] → Make private, expose asObservable()
VIOLATIONS (5): Fix → re-validate
Creation: of, from, defer, interval, fromEvent
Transform: map, scan, switchMap, mergeMap, concatMap
Filter: filter, take, takeUntil, debounceTime
Error: catchError, retry, retryWhen
Cleanup: takeUntil, unsubscribe, finalize
State: BehaviorSubject + scan
Test: TestScheduler + marble syntax
HTTP → switchMap (cancel) | mergeMap (parallel) | concatMap (sequential)
Events → fromEvent + debounceTime/throttleTime
Cleanup → takeUntil(destroy$) OR subscription.unsubscribe()
Error → catchError(err => of(fallback))
See supporting files for detailed patterns: