Implement RxJS observables, apply operators, fix memory leaks with unsubscribe patterns, handle errors, create subjects, and build reactive data pipelines in Angular applications.
Implements RxJS observables, operators, and subjects in Angular apps. Triggers when you need reactive data streams, handle async operations, fix memory leaks, or build event pipelines.
/plugin marketplace add pluginagentmarketplace/custom-plugin-angular/plugin install angular-development-assistant@pluginagentmarketplace-angularThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/README.mdassets/config.yamlassets/operators-config.yamlreferences/GUIDE.mdreferences/OPERATORS.mdreferences/README.mdscripts/README.mdscripts/check-subscriptions.shscripts/helper.pyimport { Observable } from 'rxjs';
// Create observable
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
// Subscribe
const subscription = observable.subscribe({
next: (value) => console.log(value),
error: (error) => console.error(error),
complete: () => console.log('Done')
});
// Unsubscribe
subscription.unsubscribe();
import { map, filter, switchMap, takeUntil } from 'rxjs/operators';
// Transformation
data$.pipe(
map(user => user.name),
filter(name => name.length > 0)
).subscribe(name => console.log(name));
// Higher-order
userId$.pipe(
switchMap(id => this.userService.getUser(id))
).subscribe(user => console.log(user));
import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs';
// Subject - No initial value
const subject = new Subject<string>();
subject.next('hello');
// BehaviorSubject - Has initial value
const behavior = new BehaviorSubject<string>('initial');
behavior.next('new value');
// ReplaySubject - Replays N values
const replay = new ReplaySubject<string>(3);
replay.next('one');
replay.next('two');
@Injectable()
export class NotificationService {
private messageSubject = new Subject<string>();
public message$ = this.messageSubject.asObservable();
notify(message: string) {
this.messageSubject.next(message);
}
}
// Usage
constructor(private notification: NotificationService) {
this.notification.message$.subscribe(msg => {
console.log('Notification:', msg);
});
}
// map - Transform values
source$.pipe(
map(user => user.name)
)
// switchMap - Switch to new observable (cancel previous)
userId$.pipe(
switchMap(id => this.userService.getUser(id))
)
// mergeMap - Merge all results
fileIds$.pipe(
mergeMap(id => this.downloadFile(id))
)
// concatMap - Sequential processing
tasks$.pipe(
concatMap(task => this.processTask(task))
)
// exhaustMap - Ignore new while processing
clicks$.pipe(
exhaustMap(() => this.longRequest())
)
// filter - Only pass matching values
data$.pipe(
filter(item => item.active)
)
// first - Take first value
data$.pipe(first())
// take - Take N values
data$.pipe(take(5))
// takeUntil - Take until condition
data$.pipe(
takeUntil(this.destroy$)
)
// distinct - Filter duplicates
data$.pipe(
distinct(),
distinctUntilChanged()
)
// debounceTime - Wait N ms
input$.pipe(
debounceTime(300),
distinctUntilChanged()
)
import { combineLatest, merge, concat, zip } from 'rxjs';
// combineLatest - Latest from all
combineLatest([user$, settings$, theme$]).pipe(
map(([user, settings, theme]) => ({ user, settings, theme }))
)
// merge - Values from any
merge(click$, hover$, input$)
// concat - Sequential
concat(request1$, request2$, request3$)
// zip - Wait for all
zip(form1$, form2$, form3$)
// withLatestFrom - Combine with latest
click$.pipe(
withLatestFrom(user$),
map(([click, user]) => ({ click, user }))
)
// catchError - Handle errors
data$.pipe(
catchError(error => {
console.error('Error:', error);
return of(defaultValue);
})
)
// retry - Retry on error
request$.pipe(
retry(3),
catchError(error => throwError(error))
)
// timeout - Timeout if no value
request$.pipe(
timeout(5000),
catchError(error => of(null))
)
private destroy$ = new Subject<void>();
ngOnInit() {
this.data$.pipe(
takeUntil(this.destroy$)
).subscribe(data => {
this.processData(data);
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
// Component
export class UserComponent {
user$ = this.userService.getUser(1);
constructor(private userService: UserService) {}
}
// Template - Async pipe handles unsubscribe
<div>{{ user$ | async as user }}
<p>{{ user.name }}</p>
</div>
// Hot observable - Share single subscription
readonly users$ = this.http.get('/api/users').pipe(
shareReplay(1) // Cache last result
);
// Now multiple subscriptions use same HTTP request
this.users$.subscribe(users => {...});
this.users$.subscribe(users => {...}); // Reuses cached
// Accumulate state
const counter$ = clicks$.pipe(
scan((count) => count + 1, 0)
)
// Complex state
const appState$ = actions$.pipe(
scan((state, action) => {
switch(action.type) {
case 'ADD_USER': return { ...state, users: [...state.users, action.user] };
case 'DELETE_USER': return { ...state, users: state.users.filter(u => u.id !== action.id) };
default: return state;
}
}, initialState)
)
// Parallel requests
forkJoin({
users: this.userService.getUsers(),
settings: this.settingService.getSettings(),
themes: this.themeService.getThemes()
}).subscribe(({ users, settings, themes }) => {
console.log('All loaded:', users, settings, themes);
})
import { marbles } from 'rxjs-marbles';
it('should map values correctly', marbles((m) => {
const source = m.hot('a-b-|', { a: 1, b: 2 });
const expected = m.cold('x-y-|', { x: 2, y: 4 });
const result = source.pipe(
map(x => x * 2)
);
m.expect(result).toBeObservable(expected);
}));
Observable<User> not just Observable// ❌ Wrong - Creates multiple subscriptions
this.data$.subscribe(d => {
this.data$.subscribe(d2 => {
// nested subscriptions!
});
});
// ✅ Correct - Use switchMap
this.data$.pipe(
switchMap(d => this.otherService.fetch(d))
).subscribe(result => {
// handled
});
// ❌ Wrong - Memory leak
ngOnInit() {
this.data$.subscribe(data => this.data = data);
}
// ✅ Correct - Unsubscribe or async
ngOnInit() {
this.data$ = this.service.getData();
}
// In template: {{ data$ | async }}
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.