Essential RxJS Operators: Complete Guide to Creation, Filtering, and Utility Operators
🛠️ Essential RxJS Operators
Beyond the higher-order mapping operators, RxJS provides a rich set of operators for creating, filtering, transforming, and managing Observable streams. This guide covers the most important operators you'll use in real-world applications.
🔄 Creation Operators
of() - Create Observable from Values
import { of } from 'rxjs';
of(1, 2, 3, 4, 5).subscribe(console.log);
// Output: 1, 2, 3, 4, 5
of({ name: 'John' }, { name: 'Jane' }).subscribe(console.log);
// Output: { name: 'John' }, { name: 'Jane' }
from() - Convert Array/Promise to Observable
import { from } from 'rxjs';
// From array
from([1, 2, 3]).subscribe(console.log);
// Output: 1, 2, 3
// From Promise
from(fetch('/api/users')).subscribe(response => console.log(response));
interval() - Emit Numbers at Regular Intervals
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
interval(1000).pipe(
take(5) // Only take first 5 emissions
).subscribe(n => console.log(`Timer: ${n}`));
// Output: Timer: 0, Timer: 1, Timer: 2, Timer: 3, Timer: 4
timer() - Emit After Delay
import { timer } from 'rxjs';
// Emit after 3 seconds, then every 1 second
timer(3000, 1000).pipe(
take(3)
).subscribe(n => console.log(`Delayed timer: ${n}`));
🎯 Filtering Operators
filter() - Filter Values Based on Condition
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5, 6).pipe(
filter(n => n % 2 === 0) // Only even numbers
).subscribe(console.log);
// Output: 2, 4, 6
// Real example: Filter valid form inputs
formInput$.pipe(
filter(value => value.length >= 3)
).subscribe(validInput => console.log(validInput));
take() - Take Only First N Values
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
interval(1000).pipe(
take(3) // Only first 3 values
).subscribe(console.log);
// Output: 0, 1, 2 (then completes)
takeUntil() - Take Until Another Observable Emits
import { interval, fromEvent } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
const stop$ = fromEvent(stopButton, 'click');
interval(1000).pipe(
takeUntil(stop$) // Stop when button is clicked
).subscribe(console.log);
// Common pattern for component cleanup
private destroy$ = new Subject();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(console.log);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
distinctUntilChanged() - Skip Duplicate Consecutive Values
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
of(1, 1, 2, 2, 2, 3, 3, 1).pipe(
distinctUntilChanged()
).subscribe(console.log);
// Output: 1, 2, 3, 1
// Real example: Search input
searchInput$.pipe(
distinctUntilChanged(), // Don't search for same term
debounceTime(300)
).subscribe(query => search(query));
skip() - Skip First N Values
import { of } from 'rxjs';
import { skip } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
skip(2) // Skip first 2 values
).subscribe(console.log);
// Output: 3, 4, 5
⏰ Time-Based Operators
debounceTime() - Emit Only After Silence Period
import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';
// Search autocomplete
fromEvent(searchInput, 'input').pipe(
map(event => event.target.value),
debounceTime(300) // Wait 300ms after user stops typing
).subscribe(query => performSearch(query));
// Timeline visualization:
// Input: a-ab-abc----abcd----|
// Output: --------abc----abcd-|
throttleTime() - Emit at Most Once Per Time Period
import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
// Button click protection
fromEvent(button, 'click').pipe(
throttleTime(1000) // At most one click per second
).subscribe(() => handleClick());
// Timeline visualization:
// Input: a-b-c-d-e-f-g-h----|
// Output: a-------e----------|
delay() - Delay Emissions
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';
of('Hello', 'World').pipe(
delay(2000) // Delay all emissions by 2 seconds
).subscribe(console.log);
timeout() - Error if No Emission Within Time
import { timer } from 'rxjs';
import { timeout, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
timer(5000).pipe(
timeout(3000), // Timeout after 3 seconds
catchError(error => of('Request timed out'))
).subscribe(console.log);
// Output: 'Request timed out'
🔄 Transformation Operators
scan() - Accumulate Values Over Time
import { of } from 'rxjs';
import { scan } from 'rxjs/operators';
// Running total
of(1, 2, 3, 4, 5).pipe(
scan((acc, value) => acc + value, 0)
).subscribe(console.log);
// Output: 1, 3, 6, 10, 15
// Real example: Shopping cart total
cartItems$.pipe(
scan((total, item) => total + item.price, 0)
).subscribe(total => updateCartTotal(total));
reduce() - Accumulate and Emit Final Result
import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
reduce((acc, value) => acc + value, 0)
).subscribe(console.log);
// Output: 15 (only final sum)
pluck() - Extract Property from Objects
import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';
of(
{ name: 'John', age: 30 },
{ name: 'Jane', age: 25 }
).pipe(
pluck('name')
).subscribe(console.log);
// Output: 'John', 'Jane'
🔀 Combination Operators
startWith() - Start with Initial Value
import { of } from 'rxjs';
import { startWith } from 'rxjs/operators';
of(2, 3, 4).pipe(
startWith(1)
).subscribe(console.log);
// Output: 1, 2, 3, 4
// Real example: Loading state
dataStream$.pipe(
startWith({ loading: true })
).subscribe(state => updateUI(state));
zip() - Combine Observables by Index
import { of, zip } from 'rxjs';
const first$ = of(1, 2, 3);
const second$ = of('a', 'b', 'c');
zip(first$, second$).subscribe(console.log);
// Output: [1, 'a'], [2, 'b'], [3, 'c']
// Real example: Combine user data
zip(
http.get('/api/user'),
http.get('/api/user/preferences')
).subscribe(([user, preferences]) => {
console.log({ user, preferences });
});
withLatestFrom() - Combine with Latest from Another Stream
import { fromEvent, interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';
const clicks$ = fromEvent(button, 'click');
const timer$ = interval(1000);
clicks$.pipe(
withLatestFrom(timer$)
).subscribe(([click, timerValue]) => {
console.log(`Clicked at timer: ${timerValue}`);
});
🎛️ Conditional Operators
takeWhile() - Take While Condition is True
import { interval } from 'rxjs';
import { takeWhile } from 'rxjs/operators';
interval(1000).pipe(
takeWhile(n => n < 5)
).subscribe(console.log);
// Output: 0, 1, 2, 3, 4 (stops when condition becomes false)
skipWhile() - Skip While Condition is True
import { of } from 'rxjs';
import { skipWhile } from 'rxjs/operators';
of(1, 2, 3, 4, 5, 2, 1).pipe(
skipWhile(n => n < 4)
).subscribe(console.log);
// Output: 4, 5, 2, 1 (starts emitting after condition becomes false)
isEmpty() - Check if Observable is Empty
import { of, EMPTY } from 'rxjs';
import { isEmpty } from 'rxjs/operators';
EMPTY.pipe(
isEmpty()
).subscribe(console.log);
// Output: true
of(1, 2, 3).pipe(
isEmpty()
).subscribe(console.log);
// Output: false
🚨 Error Handling Operators
catchError() - Handle Errors Gracefully
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
throwError('Something went wrong!').pipe(
catchError(error => {
console.error('Error caught:', error);
return of('Default value'); // Return fallback
})
).subscribe(console.log);
// Output: 'Default value'
// Real example: API error handling
http.get('/api/users').pipe(
catchError(error => {
console.error('API Error:', error);
return of([]); // Return empty array on error
})
).subscribe(users => displayUsers(users));
retry() - Retry on Error
import { throwError } from 'rxjs';
import { retry, catchError } from 'rxjs/operators';
let attempts = 0;
const failingRequest$ = new Observable(observer => {
attempts++;
if (attempts < 3) {
observer.error(`Attempt ${attempts} failed`);
} else {
observer.next('Success!');
observer.complete();
}
});
failingRequest$.pipe(
retry(2), // Retry up to 2 times
catchError(error => of('All retries failed'))
).subscribe(console.log);
// Output: 'Success!' (after 2 retries)
retryWhen() - Retry with Custom Logic
import { throwError, timer } from 'rxjs';
import { retryWhen, delayWhen, take } from 'rxjs/operators';
apiCall$.pipe(
retryWhen(errors =>
errors.pipe(
delayWhen(() => timer(2000)), // Delay 2 seconds between retries
take(3) // Maximum 3 retries
)
)
).subscribe(
result => console.log('Success:', result),
error => console.log('Failed after retries:', error)
);
🔧 Utility Operators
tap() - Perform Side Effects
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
of(1, 2, 3).pipe(
tap(value => console.log('Before map:', value)), // Side effect
map(x => x * 2),
tap(value => console.log('After map:', value)) // Side effect
).subscribe(final => console.log('Final:', final));
// Real example: Logging and debugging
userActions$.pipe(
tap(action => console.log('Action dispatched:', action)),
// ... other operators
tap(result => analytics.track('action_completed', result))
).subscribe(handleResult);
finalize() - Execute Code When Observable Completes or Errors
import { of } from 'rxjs';
import { finalize, delay } from 'rxjs/operators';
of(1, 2, 3).pipe(
delay(1000),
finalize(() => console.log('Cleanup code executed'))
).subscribe(
value => console.log('Value:', value),
error => console.log('Error:', error),
() => console.log('Completed')
);
// Output: Value: 1, Value: 2, Value: 3, Completed, Cleanup code executed
// Real example: Hide loading spinner
loadData$.pipe(
finalize(() => hideLoadingSpinner())
).subscribe(data => displayData(data));
share() - Share Subscription Among Multiple Subscribers
import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';
const shared$ = interval(1000).pipe(
take(3),
share() // Share the same subscription
);
// Both subscribers receive the same values
shared$.subscribe(val => console.log('Subscriber 1:', val));
shared$.subscribe(val => console.log('Subscriber 2:', val));
// Without share(), each subscriber would get its own interval
shareReplay() - Share and Replay Last N Values
import { interval } from 'rxjs';
import { shareReplay, take } from 'rxjs/operators';
const sharedWithReplay$ = interval(1000).pipe(
take(3),
shareReplay(1) // Share and replay last 1 value
);
sharedWithReplay$.subscribe(val => console.log('First:', val));
// Late subscriber gets the last emitted value immediately
setTimeout(() => {
sharedWithReplay$.subscribe(val => console.log('Late subscriber:', val));
}, 2500);
🎯 Operator Combination Patterns
Polling with Error Handling
import { timer, EMPTY } from 'rxjs';
import { switchMap, retry, catchError } from 'rxjs/operators';
timer(0, 5000).pipe( // Poll every 5 seconds
switchMap(() =>
http.get('/api/status').pipe(
retry(3),
catchError(error => {
console.error('Polling failed:', error);
return EMPTY; // Skip this emission on error
})
)
)
).subscribe(status => updateStatus(status));
Form Validation with Debounce
import { combineLatest } from 'rxjs';
import { debounceTime, distinctUntilChanged, map, startWith } from 'rxjs/operators';
const email$ = emailInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
map(email => ({ email, valid: isValidEmail(email) })),
startWith({ email: '', valid: false })
);
const password$ = passwordInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
map(password => ({ password, valid: password.length >= 8 })),
startWith({ password: '', valid: false })
);
combineLatest([email$, password$]).pipe(
map(([emailState, passwordState]) => ({
...emailState,
...passwordState,
formValid: emailState.valid && passwordState.valid
}))
).subscribe(formState => updateFormUI(formState));
Progressive Data Loading
import { forkJoin, of } from 'rxjs';
import { switchMap, startWith, catchError } from 'rxjs/operators';
userProfile$.pipe(
startWith({ loading: true }),
switchMap(profile =>
forkJoin({
profile: of(profile),
posts: http.get(`/api/users/${profile.id}/posts`).pipe(
catchError(() => of([]))
),
followers: http.get(`/api/users/${profile.id}/followers`).pipe(
catchError(() => of([]))
)
})
)
).subscribe(data => renderProfilePage(data));
Real-time Data Sync with Conflict Resolution
import { merge, combineLatest } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, scan } from 'rxjs/operators';
// Combine local changes and server updates
const localChanges$ = formChanges$.pipe(
debounceTime(500),
distinctUntilChanged()
);
const serverUpdates$ = websocket$.pipe(
filter(message => message.type === 'data_update')
);
merge(localChanges$, serverUpdates$).pipe(
scan((state, change) => mergeChanges(state, change), initialState),
distinctUntilChanged(),
switchMap(state => saveToServer(state))
).subscribe(savedState => updateUI(savedState));
🔑 Essential Operator Categories Summary
Essential Operator Categories:
- Creation:
of,from,interval,timer - Filtering:
filter,take,takeUntil,distinctUntilChanged - Transformation:
map,scan,reduce,pluck - Time-based:
debounceTime,throttleTime,delay - Combination:
startWith,zip,withLatestFrom - Error handling:
catchError,retry,retryWhen - Utility:
tap,finalize,share,shareReplay
Performance Tips:
// Use shareReplay for expensive operations
const expensiveData$ = http.get('/api/expensive').pipe(
shareReplay(1)
);
// Use debounceTime for user input
searchInput$.pipe(
debounceTime(300),
distinctUntilChanged()
);
// Use takeUntil for cleanup
observable$.pipe(
takeUntil(this.destroy$)
);
// Use finalize for cleanup actions
request$.pipe(
finalize(() => hideSpinner())
);
Common Patterns:
// Search pattern
searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(query => query.length >= 2),
switchMap(query => searchAPI(query)),
catchError(() => of([]))
);
// Auto-save pattern
formData$.pipe(
debounceTime(2000),
distinctUntilChanged(),
switchMap(data => saveData(data)),
retry(3),
catchError(error => of({ error: true }))
);
// Cleanup pattern
dataStream$.pipe(
takeUntil(this.destroy$),
finalize(() => console.log('Stream cleanup complete'))
);
RxJS operators provide powerful tools for handling complex asynchronous scenarios elegantly! 🚀