rxjs-patterns-for-angular
Implement RxJS patterns for reactive programming in Angular. Use this skill when working with Observables, operators, subscriptions, async data flows, and error handling. Covers common patterns like combineLatest, switchMap, debounceTime, catchError, retry logic, and integration with Angular Signals using toSignal() and toObservable(). Ensures proper subscription cleanup with takeUntilDestroyed().
Best use case
rxjs-patterns-for-angular is best used when you need a repeatable AI agent workflow instead of a one-off prompt.
Implement RxJS patterns for reactive programming in Angular. Use this skill when working with Observables, operators, subscriptions, async data flows, and error handling. Covers common patterns like combineLatest, switchMap, debounceTime, catchError, retry logic, and integration with Angular Signals using toSignal() and toObservable(). Ensures proper subscription cleanup with takeUntilDestroyed().
Teams using rxjs-patterns-for-angular should expect a more consistent output, faster repeated execution, less prompt rewriting.
When to use this skill
- You want a reusable workflow that can be run more than once with consistent structure.
When not to use this skill
- You only need a quick one-off answer and do not need a reusable workflow.
- You cannot install or maintain the underlying files, dependencies, or repository context.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/rxjs-patterns-for-angular/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How rxjs-patterns-for-angular Compares
| Feature / Agent | rxjs-patterns-for-angular | Standard Approach |
|---|---|---|
| Platform Support | Not specified | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | Unknown | N/A |
Frequently Asked Questions
What does this skill do?
Implement RxJS patterns for reactive programming in Angular. Use this skill when working with Observables, operators, subscriptions, async data flows, and error handling. Covers common patterns like combineLatest, switchMap, debounceTime, catchError, retry logic, and integration with Angular Signals using toSignal() and toObservable(). Ensures proper subscription cleanup with takeUntilDestroyed().
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
SKILL.md Source
# RxJS Patterns for Angular Skill
This skill helps implement reactive patterns using RxJS in Angular applications.
## Core Principles
### Modern Angular + RxJS
- **Signals First**: Use Signals for state, RxJS for async operations
- **Auto Cleanup**: Use `takeUntilDestroyed()` for subscription management
- **Interop**: Use `toSignal()` and `toObservable()` for Signal/Observable conversion
- **AsyncPipe**: Prefer AsyncPipe in templates when not using Signals
### Key Concepts
- Observables for async data streams
- Operators for data transformation
- Subscription management and cleanup
- Error handling and retry logic
## Signal + RxJS Integration
### toSignal() - Observable to Signal
```typescript
import { Component, inject } from '@angular/core';
import { toSignal } from '@angular/core/rxjs-interop';
import { HttpClient } from '@angular/common/http';
@Component({
selector: 'app-task-list',
template: `
@if (tasks(); as taskList) {
@for (task of taskList; track task.id) {
<div>{{ task.title }}</div>
}
}
`
})
export class TaskListComponent {
private http = inject(HttpClient);
// Convert Observable to Signal
tasks = toSignal(
this.http.get<Task[]>('/api/tasks'),
{ initialValue: [] }
);
}
```
### toObservable() - Signal to Observable
```typescript
import { Component, signal } from '@angular/core';
import { toObservable } from '@angular/core/rxjs-interop';
import { switchMap } from 'rxjs/operators';
@Component({
selector: 'app-search',
template: `
<input
nz-input
[ngModel]="searchQuery()"
(ngModelChange)="searchQuery.set($event)"
/>
@if (results(); as resultList) {
@for (result of resultList; track result.id) {
<div>{{ result.name }}</div>
}
}
`
})
export class SearchComponent {
searchQuery = signal('');
// Convert Signal to Observable and transform
private searchQuery$ = toObservable(this.searchQuery);
results = toSignal(
this.searchQuery$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
),
{ initialValue: [] }
);
}
```
## Subscription Management
### takeUntilDestroyed() - Auto Cleanup
```typescript
import { Component, inject, signal, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';
@Component({
selector: 'app-timer',
template: `<div>Time: {{ time() }}</div>`
})
export class TimerComponent {
private destroyRef = inject(DestroyRef);
time = signal(0);
constructor() {
// Subscription automatically cleaned up on component destroy
interval(1000)
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(value => this.time.set(value));
}
}
```
### Manual Cleanup (Legacy Pattern - Avoid)
```typescript
// ❌ DON'T: Manual subscription management (old pattern)
export class LegacyComponent implements OnDestroy {
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(
this.dataService.getData().subscribe(data => {
// handle data
})
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
// ✅ DO: Use takeUntilDestroyed()
export class ModernComponent {
private destroyRef = inject(DestroyRef);
data = signal<any>(null);
constructor() {
this.dataService.getData()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(data => this.data.set(data));
}
}
```
## Common Operators
### switchMap - Switch to New Observable
```typescript
// Switch to new search on every query change
searchResults$ = this.searchQuery$.pipe(
debounceTime(300),
switchMap(query => this.http.get(`/api/search?q=${query}`))
);
```
### mergeMap - Merge Multiple Observables
```typescript
// Process all tasks in parallel
processTasks$ = this.tasks$.pipe(
mergeMap(tasks =>
from(tasks).pipe(
mergeMap(task => this.processTask(task))
)
)
);
```
### concatMap - Process Sequentially
```typescript
// Process tasks one by one in order
processTasks$ = this.tasks$.pipe(
concatMap(tasks =>
from(tasks).pipe(
concatMap(task => this.processTask(task))
)
)
);
```
### debounceTime - Debounce Input
```typescript
// Wait 300ms after user stops typing
search$ = this.searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
);
```
### distinctUntilChanged - Skip Duplicates
```typescript
// Only emit when value actually changes
status$ = this.statusSubject$.pipe(
distinctUntilChanged()
);
```
### filter - Filter Values
```typescript
// Only emit non-empty strings
nonEmptySearch$ = this.searchQuery$.pipe(
filter(query => query.trim().length > 0),
switchMap(query => this.search(query))
);
```
### map - Transform Values
```typescript
// Transform task to display format
taskDisplay$ = this.task$.pipe(
map(task => ({
title: task.title,
status: task.status.toUpperCase(),
dueDate: formatDate(task.dueDate)
}))
);
```
### tap - Side Effects
```typescript
// Log without transforming
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
tap(tasks => console.log('Loaded tasks:', tasks.length)),
tap(tasks => this.analyticsService.track('tasks_loaded'))
);
```
## Combining Observables
### combineLatest - Wait for All
```typescript
import { combineLatest } from 'rxjs';
// Combine multiple observables
viewModel$ = combineLatest([
this.tasks$,
this.users$,
this.settings$
]).pipe(
map(([tasks, users, settings]) => ({
tasks,
users,
settings
}))
);
// Convert to Signal
viewModel = toSignal(this.viewModel$);
```
### forkJoin - Wait for All to Complete
```typescript
import { forkJoin } from 'rxjs';
// Load multiple resources in parallel
loadAll$ = forkJoin({
tasks: this.taskService.getTasks(),
users: this.userService.getUsers(),
projects: this.projectService.getProjects()
}).pipe(
map(({ tasks, users, projects }) => ({
tasks,
users,
projects
}))
);
```
### merge - Merge Multiple Streams
```typescript
import { merge } from 'rxjs';
// Combine multiple event streams
allEvents$ = merge(
this.createEvent$,
this.updateEvent$,
this.deleteEvent$
).pipe(
tap(event => this.handleEvent(event))
);
```
### zip - Pair Up Values
```typescript
import { zip } from 'rxjs';
// Pair up matching values from two streams
paired$ = zip(
this.stream1$,
this.stream2$
).pipe(
map(([value1, value2]) => ({ value1, value2 }))
);
```
## Error Handling
### catchError - Handle Errors
```typescript
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
catchError(error => {
console.error('Failed to load tasks:', error);
this.notificationService.error('Failed to load tasks');
return of([]); // Return empty array as fallback
})
);
```
### retry - Retry on Failure
```typescript
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
retry(3), // Retry up to 3 times
catchError(error => {
console.error('Failed after 3 retries:', error);
return of([]);
})
);
```
### retryWhen - Conditional Retry with Backoff
```typescript
import { retryWhen, delay, scan, throwError } from 'rxjs';
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) {
throw error; // Max retries reached
}
console.log(`Retry ${retryCount + 1}/3`);
return retryCount + 1;
}, 0),
delay(1000) // Wait 1 second between retries
)
),
catchError(error => {
console.error('Failed after retries:', error);
return of([]);
})
);
```
## Real-Time Data
### interval - Periodic Updates
```typescript
import { interval, switchMap } from 'rxjs';
// Poll every 30 seconds
liveData$ = interval(30000).pipe(
startWith(0), // Emit immediately
switchMap(() => this.http.get('/api/live-data')),
takeUntilDestroyed(this.destroyRef)
);
liveData = toSignal(this.liveData$);
```
### WebSocket Pattern
```typescript
import { webSocket } from 'rxjs/webSocket';
export class RealtimeService {
private socket$ = webSocket('wss://api.example.com/ws');
messages$ = this.socket$.pipe(
catchError(error => {
console.error('WebSocket error:', error);
return EMPTY;
}),
retry({ delay: 5000 }) // Reconnect after 5 seconds
);
sendMessage(msg: any): void {
this.socket$.next(msg);
}
}
```
## Loading States
### Share Loading State
```typescript
import { shareReplay } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class TaskService {
private http = inject(HttpClient);
// Cache and share the result
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
}
```
### Loading Indicator Pattern
```typescript
@Component({
selector: 'app-task-list',
template: `
@if (loading()) {
<nz-spin />
} @else if (error()) {
<nz-alert nzType="error" [nzMessage]="error()!" />
} @else {
@for (task of tasks(); track task.id) {
<div>{{ task.title }}</div>
}
}
`
})
export class TaskListComponent {
private taskService = inject(TaskService);
private destroyRef = inject(DestroyRef);
loading = signal(false);
error = signal<string | null>(null);
tasks = signal<Task[]>([]);
constructor() {
this.loadTasks();
}
loadTasks(): void {
this.loading.set(true);
this.error.set(null);
this.taskService.tasks$
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe({
next: (tasks) => {
this.tasks.set(tasks);
this.loading.set(false);
},
error: (err) => {
this.error.set(err.message || 'Failed to load tasks');
this.loading.set(false);
}
});
}
}
```
## Advanced Patterns
### Throttle vs Debounce
```typescript
import { throttleTime, debounceTime } from 'rxjs';
// Throttle: Emit first, then ignore for duration
throttled$ = this.clicks$.pipe(
throttleTime(1000) // Max once per second
);
// Debounce: Wait for quiet period
debounced$ = this.input$.pipe(
debounceTime(300) // Wait 300ms after last input
);
```
### Scan - Accumulate Values
```typescript
// Running total
total$ = this.amounts$.pipe(
scan((acc, value) => acc + value, 0)
);
// History accumulation
history$ = this.events$.pipe(
scan((history, event) => [...history, event], [] as Event[])
);
```
### startWith - Initial Value
```typescript
// Start with loading state
status$ = this.dataLoad$.pipe(
map(() => 'loaded'),
startWith('loading')
);
```
### pairwise - Previous + Current
```typescript
// Compare with previous value
changes$ = this.value$.pipe(
pairwise(),
map(([prev, curr]) => ({
previous: prev,
current: curr,
diff: curr - prev
}))
);
```
## Best Practices
### ✅ DO
```typescript
// Use toSignal() for reactive data in templates
data = toSignal(this.data$, { initialValue: [] });
// Use takeUntilDestroyed() for cleanup
this.data$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe();
// Use switchMap for user-triggered requests
search$ = this.query$.pipe(switchMap(q => this.search(q)));
// Handle errors explicitly
data$ = this.http.get('/api/data').pipe(
catchError(err => of(null))
);
```
### ❌ DON'T
```typescript
// Don't forget to unsubscribe
this.data$.subscribe(); // Memory leak!
// Don't use nested subscribes
this.data$.subscribe(data => {
this.process(data).subscribe(); // Anti-pattern!
});
// Don't use async pipe with signals
@if (data$ | async) { } // Use signals instead
```
## Checklist
When using RxJS:
- [ ] Use toSignal() to convert Observables to Signals
- [ ] Use takeUntilDestroyed() for subscription cleanup
- [ ] Handle errors with catchError()
- [ ] Debounce user input (300ms)
- [ ] Use switchMap for cancellable requests
- [ ] Share expensive Observables with shareReplay()
- [ ] Provide initial values with startWith()
- [ ] Filter out empty/null values
- [ ] Test async operations
- [ ] Document complex operator chains
## References
- [RxJS Documentation](https://rxjs.dev/)
- [Angular Signals + RxJS Interop](https://angular.dev/guide/signals/rxjs-interop)
- [RxJS Operators](https://rxjs.dev/guide/operators)
- [Learn RxJS](https://www.learnrxjs.io/)Related Skills
exa-sdk-patterns
Apply production-ready exa-js SDK patterns with type safety, singletons, and wrappers. Use when implementing Exa integrations, refactoring SDK usage, or establishing team coding standards for Exa. Trigger with phrases like "exa SDK patterns", "exa best practices", "exa code patterns", "idiomatic exa", "exa wrapper".
exa-reliability-patterns
Implement Exa reliability patterns: query fallback chains, circuit breakers, and graceful degradation. Use when building fault-tolerant Exa integrations, implementing fallback strategies, or adding resilience to production search services. Trigger with phrases like "exa reliability", "exa circuit breaker", "exa fallback", "exa resilience", "exa graceful degradation".
evernote-sdk-patterns
Advanced Evernote SDK patterns and best practices. Use when implementing complex note operations, batch processing, search queries, or optimizing SDK usage. Trigger with phrases like "evernote sdk patterns", "evernote best practices", "evernote advanced", "evernote batch operations".
elevenlabs-sdk-patterns
Apply production-ready ElevenLabs SDK patterns for TypeScript and Python. Use when implementing ElevenLabs integrations, refactoring SDK usage, or establishing team coding standards for audio AI applications. Trigger: "elevenlabs SDK patterns", "elevenlabs best practices", "elevenlabs code patterns", "idiomatic elevenlabs", "elevenlabs typescript".
documenso-sdk-patterns
Apply production-ready Documenso SDK patterns for TypeScript and Python. Use when implementing Documenso integrations, refactoring SDK usage, or establishing team coding standards for Documenso. Trigger with phrases like "documenso SDK patterns", "documenso best practices", "documenso code patterns", "idiomatic documenso".
deepgram-sdk-patterns
Apply production-ready Deepgram SDK patterns for TypeScript and Python. Use when implementing Deepgram integrations, refactoring SDK usage, or establishing team coding standards for Deepgram. Trigger: "deepgram SDK patterns", "deepgram best practices", "deepgram code patterns", "idiomatic deepgram", "deepgram typescript".
databricks-sdk-patterns
Apply production-ready Databricks SDK patterns for Python and REST API. Use when implementing Databricks integrations, refactoring SDK usage, or establishing team coding standards for Databricks. Trigger with phrases like "databricks SDK patterns", "databricks best practices", "databricks code patterns", "idiomatic databricks".
customerio-sdk-patterns
Apply production-ready Customer.io SDK patterns. Use when implementing typed clients, retry logic, event batching, or singleton management for customerio-node. Trigger: "customer.io best practices", "customer.io patterns", "production customer.io", "customer.io architecture", "customer.io singleton".
customerio-reliability-patterns
Implement Customer.io reliability and fault-tolerance patterns. Use when building circuit breakers, fallback queues, idempotency, or graceful degradation for Customer.io integrations. Trigger: "customer.io reliability", "customer.io resilience", "customer.io circuit breaker", "customer.io fault tolerance".
coreweave-sdk-patterns
Production-ready patterns for CoreWeave GPU workload management with kubectl and Python. Use when building inference clients, managing GPU deployments programmatically, or creating reusable CoreWeave deployment templates. Trigger with phrases like "coreweave patterns", "coreweave client", "coreweave Python", "coreweave deployment template".
cohere-sdk-patterns
Apply production-ready Cohere SDK patterns for TypeScript and Python. Use when implementing Cohere integrations, refactoring SDK usage, or establishing team coding standards for Cohere API v2. Trigger with phrases like "cohere SDK patterns", "cohere best practices", "cohere code patterns", "idiomatic cohere", "cohere wrapper".
coderabbit-sdk-patterns
Apply production-ready CodeRabbit automation patterns using GitHub API and PR comments. Use when building automation around CodeRabbit reviews, processing review feedback programmatically, or integrating CodeRabbit into custom workflows. Trigger with phrases like "coderabbit automation", "coderabbit API patterns", "automate coderabbit", "coderabbit github api", "process coderabbit reviews".