From ng
RxJS patterns in Angular — operators, signals interop, cleanup, and common recipes. Auto-invoked when using observables in Angular files.
npx claudepluginhub mayeedwin/angular-plugin --plugin ngThis skill uses the workspace's default tool permissions.
**Reference**: https://angular.dev/ecosystem/rxjs-interop
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides MCP server integration in Claude Code plugins via .mcp.json or plugin.json configs for stdio, SSE, HTTP types, enabling external services as tools.
Reference: https://angular.dev/ecosystem/rxjs-interop
| Use case | Operator |
|---|---|
| Cancel previous on new emit (search, autocomplete) | switchMap |
| Sequential, ordered (form saves, queue) | concatMap |
| Parallel, all results needed (batch requests) | mergeMap |
| Ignore new while in-flight (login button) | exhaustMap |
| Combine latest from multiple streams | combineLatest |
| One-shot parallel requests | forkJoin |
| Transform each value | map |
| Filter values | filter |
| Side effects without transforming | tap |
| Catch and recover from errors | catchError |
| Retry on error | retry({ count: 3, delay: 1000 }) |
| Emit only distinct consecutive values | distinctUntilChanged |
| Delay between key presses | debounceTime(300) |
toSignal)import { toSignal } from '@angular/core/rxjs-interop';
readonly products = toSignal(this.productService.getAll(), { initialValue: [] });
// Use in template: products() — no async pipe needed
toObservable)import { toObservable } from '@angular/core/rxjs-interop';
readonly searchTerm = signal('');
readonly results$ = toObservable(this.searchTerm).pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(term => this.api.search(term)),
);
takeUntilDestroyed — preferred cleanupimport { takeUntilDestroyed } from '@angular/core/rxjs-interop';
private readonly destroyRef = inject(DestroyRef);
ngOnInit() {
this.service.data$
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(data => this.data.set(data));
}
readonly searchControl = new FormControl('');
readonly results = toSignal(
this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(term => term ? this.api.search(term) : of([])),
catchError(() => of([])),
),
{ initialValue: [] }
);
readonly data = toSignal(
forkJoin({
users: this.userService.getAll(),
roles: this.roleService.getAll(),
}),
);
readonly liveData = toSignal(
interval(5000).pipe(
startWith(0),
switchMap(() => this.api.getStatus()),
takeUntilDestroyed(this.destroyRef),
),
);
readonly vm = toSignal(
this.trigger$.pipe(
switchMap(() =>
this.api.getData().pipe(
map(data => ({ data, loading: false, error: null })),
startWith({ data: null, loading: true, error: null }),
catchError(err => of({ data: null, loading: false, error: err.message })),
),
),
),
);
Always include catchError on HTTP chains — never let unhandled errors terminate streams:
this.service.getItems().pipe(
catchError(err => {
console.error(err);
return of([]); // recover with fallback
}),
)
Use retry for transient errors:
this.api.call().pipe(
retry({ count: 3, delay: 1000 }),
catchError(err => of(null)),
)
toSignal() over async pipe for new code — integrates with signalssubscribe() inside subscribe() — flatten with switchMap/mergeMapsubscribe() in a component must be cleaned up (prefer toSignal or takeUntilDestroyed)BehaviorSubject when a signal() or computed() will doSubject only for event buses or imperative triggersReplaySubject(1) when late subscribers need the last value