Skip to main content

Essential RxJS Operators: Complete Guide to Creation, Filtering, and Utility Operators

🛠️ Essential RxJS Operators

Beyond the higher-order mapping operators, RxJS provides a rich set of operators for creating, filtering, transforming, and managing Observable streams. This guide covers the most important operators you'll use in real-world applications.


🔄 Creation Operators

of() - Create Observable from Values

import { of } from 'rxjs';

of(1, 2, 3, 4, 5).subscribe(console.log);
// Output: 1, 2, 3, 4, 5

of({ name: 'John' }, { name: 'Jane' }).subscribe(console.log);
// Output: { name: 'John' }, { name: 'Jane' }

from() - Convert Array/Promise to Observable

import { from } from 'rxjs';

// From array
from([1, 2, 3]).subscribe(console.log);
// Output: 1, 2, 3

// From Promise
from(fetch('/api/users')).subscribe(response => console.log(response));

interval() - Emit Numbers at Regular Intervals

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(1000).pipe(
take(5) // Only take first 5 emissions
).subscribe(n => console.log(`Timer: ${n}`));
// Output: Timer: 0, Timer: 1, Timer: 2, Timer: 3, Timer: 4

timer() - Emit After Delay

import { timer } from 'rxjs';

// Emit after 3 seconds, then every 1 second
timer(3000, 1000).pipe(
take(3)
).subscribe(n => console.log(`Delayed timer: ${n}`));

🎯 Filtering Operators

filter() - Filter Values Based on Condition

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5, 6).pipe(
filter(n => n % 2 === 0) // Only even numbers
).subscribe(console.log);
// Output: 2, 4, 6

// Real example: Filter valid form inputs
formInput$.pipe(
filter(value => value.length >= 3)
).subscribe(validInput => console.log(validInput));

take() - Take Only First N Values

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(1000).pipe(
take(3) // Only first 3 values
).subscribe(console.log);
// Output: 0, 1, 2 (then completes)

takeUntil() - Take Until Another Observable Emits

import { interval, fromEvent } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const stop$ = fromEvent(stopButton, 'click');

interval(1000).pipe(
takeUntil(stop$) // Stop when button is clicked
).subscribe(console.log);

// Common pattern for component cleanup
private destroy$ = new Subject();

ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(console.log);
}

ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}

distinctUntilChanged() - Skip Duplicate Consecutive Values

import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

of(1, 1, 2, 2, 2, 3, 3, 1).pipe(
distinctUntilChanged()
).subscribe(console.log);
// Output: 1, 2, 3, 1

// Real example: Search input
searchInput$.pipe(
distinctUntilChanged(), // Don't search for same term
debounceTime(300)
).subscribe(query => search(query));

skip() - Skip First N Values

import { of } from 'rxjs';
import { skip } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
skip(2) // Skip first 2 values
).subscribe(console.log);
// Output: 3, 4, 5

⏰ Time-Based Operators

debounceTime() - Emit Only After Silence Period

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

// Search autocomplete
fromEvent(searchInput, 'input').pipe(
map(event => event.target.value),
debounceTime(300) // Wait 300ms after user stops typing
).subscribe(query => performSearch(query));

// Timeline visualization:
// Input: a-ab-abc----abcd----|
// Output: --------abc----abcd-|

throttleTime() - Emit at Most Once Per Time Period

import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';

// Button click protection
fromEvent(button, 'click').pipe(
throttleTime(1000) // At most one click per second
).subscribe(() => handleClick());

// Timeline visualization:
// Input: a-b-c-d-e-f-g-h----|
// Output: a-------e----------|

delay() - Delay Emissions

import { of } from 'rxjs';
import { delay } from 'rxjs/operators';

of('Hello', 'World').pipe(
delay(2000) // Delay all emissions by 2 seconds
).subscribe(console.log);

timeout() - Error if No Emission Within Time

import { timer } from 'rxjs';
import { timeout, catchError } from 'rxjs/operators';
import { of } from 'rxjs';

timer(5000).pipe(
timeout(3000), // Timeout after 3 seconds
catchError(error => of('Request timed out'))
).subscribe(console.log);
// Output: 'Request timed out'

🔄 Transformation Operators

scan() - Accumulate Values Over Time

import { of } from 'rxjs';
import { scan } from 'rxjs/operators';

// Running total
of(1, 2, 3, 4, 5).pipe(
scan((acc, value) => acc + value, 0)
).subscribe(console.log);
// Output: 1, 3, 6, 10, 15

// Real example: Shopping cart total
cartItems$.pipe(
scan((total, item) => total + item.price, 0)
).subscribe(total => updateCartTotal(total));

reduce() - Accumulate and Emit Final Result

import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
reduce((acc, value) => acc + value, 0)
).subscribe(console.log);
// Output: 15 (only final sum)

pluck() - Extract Property from Objects

import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';

of(
{ name: 'John', age: 30 },
{ name: 'Jane', age: 25 }
).pipe(
pluck('name')
).subscribe(console.log);
// Output: 'John', 'Jane'

🔀 Combination Operators

startWith() - Start with Initial Value

import { of } from 'rxjs';
import { startWith } from 'rxjs/operators';

of(2, 3, 4).pipe(
startWith(1)
).subscribe(console.log);
// Output: 1, 2, 3, 4

// Real example: Loading state
dataStream$.pipe(
startWith({ loading: true })
).subscribe(state => updateUI(state));

zip() - Combine Observables by Index

import { of, zip } from 'rxjs';

const first$ = of(1, 2, 3);
const second$ = of('a', 'b', 'c');

zip(first$, second$).subscribe(console.log);
// Output: [1, 'a'], [2, 'b'], [3, 'c']

// Real example: Combine user data
zip(
http.get('/api/user'),
http.get('/api/user/preferences')
).subscribe(([user, preferences]) => {
console.log({ user, preferences });
});

withLatestFrom() - Combine with Latest from Another Stream

import { fromEvent, interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';

const clicks$ = fromEvent(button, 'click');
const timer$ = interval(1000);

clicks$.pipe(
withLatestFrom(timer$)
).subscribe(([click, timerValue]) => {
console.log(`Clicked at timer: ${timerValue}`);
});

🎛️ Conditional Operators

takeWhile() - Take While Condition is True

import { interval } from 'rxjs';
import { takeWhile } from 'rxjs/operators';

interval(1000).pipe(
takeWhile(n => n < 5)
).subscribe(console.log);
// Output: 0, 1, 2, 3, 4 (stops when condition becomes false)

skipWhile() - Skip While Condition is True

import { of } from 'rxjs';
import { skipWhile } from 'rxjs/operators';

of(1, 2, 3, 4, 5, 2, 1).pipe(
skipWhile(n => n < 4)
).subscribe(console.log);
// Output: 4, 5, 2, 1 (starts emitting after condition becomes false)

isEmpty() - Check if Observable is Empty

import { of, EMPTY } from 'rxjs';
import { isEmpty } from 'rxjs/operators';

EMPTY.pipe(
isEmpty()
).subscribe(console.log);
// Output: true

of(1, 2, 3).pipe(
isEmpty()
).subscribe(console.log);
// Output: false

🚨 Error Handling Operators

catchError() - Handle Errors Gracefully

import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';

throwError('Something went wrong!').pipe(
catchError(error => {
console.error('Error caught:', error);
return of('Default value'); // Return fallback
})
).subscribe(console.log);
// Output: 'Default value'

// Real example: API error handling
http.get('/api/users').pipe(
catchError(error => {
console.error('API Error:', error);
return of([]); // Return empty array on error
})
).subscribe(users => displayUsers(users));

retry() - Retry on Error

import { throwError } from 'rxjs';
import { retry, catchError } from 'rxjs/operators';

let attempts = 0;
const failingRequest$ = new Observable(observer => {
attempts++;
if (attempts < 3) {
observer.error(`Attempt ${attempts} failed`);
} else {
observer.next('Success!');
observer.complete();
}
});

failingRequest$.pipe(
retry(2), // Retry up to 2 times
catchError(error => of('All retries failed'))
).subscribe(console.log);
// Output: 'Success!' (after 2 retries)

retryWhen() - Retry with Custom Logic

import { throwError, timer } from 'rxjs';
import { retryWhen, delayWhen, take } from 'rxjs/operators';

apiCall$.pipe(
retryWhen(errors =>
errors.pipe(
delayWhen(() => timer(2000)), // Delay 2 seconds between retries
take(3) // Maximum 3 retries
)
)
).subscribe(
result => console.log('Success:', result),
error => console.log('Failed after retries:', error)
);

🔧 Utility Operators

tap() - Perform Side Effects

import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

of(1, 2, 3).pipe(
tap(value => console.log('Before map:', value)), // Side effect
map(x => x * 2),
tap(value => console.log('After map:', value)) // Side effect
).subscribe(final => console.log('Final:', final));

// Real example: Logging and debugging
userActions$.pipe(
tap(action => console.log('Action dispatched:', action)),
// ... other operators
tap(result => analytics.track('action_completed', result))
).subscribe(handleResult);

finalize() - Execute Code When Observable Completes or Errors

import { of } from 'rxjs';
import { finalize, delay } from 'rxjs/operators';

of(1, 2, 3).pipe(
delay(1000),
finalize(() => console.log('Cleanup code executed'))
).subscribe(
value => console.log('Value:', value),
error => console.log('Error:', error),
() => console.log('Completed')
);
// Output: Value: 1, Value: 2, Value: 3, Completed, Cleanup code executed

// Real example: Hide loading spinner
loadData$.pipe(
finalize(() => hideLoadingSpinner())
).subscribe(data => displayData(data));

share() - Share Subscription Among Multiple Subscribers

import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';

const shared$ = interval(1000).pipe(
take(3),
share() // Share the same subscription
);

// Both subscribers receive the same values
shared$.subscribe(val => console.log('Subscriber 1:', val));
shared$.subscribe(val => console.log('Subscriber 2:', val));

// Without share(), each subscriber would get its own interval

shareReplay() - Share and Replay Last N Values

import { interval } from 'rxjs';
import { shareReplay, take } from 'rxjs/operators';

const sharedWithReplay$ = interval(1000).pipe(
take(3),
shareReplay(1) // Share and replay last 1 value
);

sharedWithReplay$.subscribe(val => console.log('First:', val));

// Late subscriber gets the last emitted value immediately
setTimeout(() => {
sharedWithReplay$.subscribe(val => console.log('Late subscriber:', val));
}, 2500);

🎯 Operator Combination Patterns

Polling with Error Handling

import { timer, EMPTY } from 'rxjs';
import { switchMap, retry, catchError } from 'rxjs/operators';

timer(0, 5000).pipe( // Poll every 5 seconds
switchMap(() =>
http.get('/api/status').pipe(
retry(3),
catchError(error => {
console.error('Polling failed:', error);
return EMPTY; // Skip this emission on error
})
)
)
).subscribe(status => updateStatus(status));

Form Validation with Debounce

import { combineLatest } from 'rxjs';
import { debounceTime, distinctUntilChanged, map, startWith } from 'rxjs/operators';

const email$ = emailInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
map(email => ({ email, valid: isValidEmail(email) })),
startWith({ email: '', valid: false })
);

const password$ = passwordInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
map(password => ({ password, valid: password.length >= 8 })),
startWith({ password: '', valid: false })
);

combineLatest([email$, password$]).pipe(
map(([emailState, passwordState]) => ({
...emailState,
...passwordState,
formValid: emailState.valid && passwordState.valid
}))
).subscribe(formState => updateFormUI(formState));

Progressive Data Loading

import { forkJoin, of } from 'rxjs';
import { switchMap, startWith, catchError } from 'rxjs/operators';

userProfile$.pipe(
startWith({ loading: true }),
switchMap(profile =>
forkJoin({
profile: of(profile),
posts: http.get(`/api/users/${profile.id}/posts`).pipe(
catchError(() => of([]))
),
followers: http.get(`/api/users/${profile.id}/followers`).pipe(
catchError(() => of([]))
)
})
)
).subscribe(data => renderProfilePage(data));

Real-time Data Sync with Conflict Resolution

import { merge, combineLatest } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, scan } from 'rxjs/operators';

// Combine local changes and server updates
const localChanges$ = formChanges$.pipe(
debounceTime(500),
distinctUntilChanged()
);

const serverUpdates$ = websocket$.pipe(
filter(message => message.type === 'data_update')
);

merge(localChanges$, serverUpdates$).pipe(
scan((state, change) => mergeChanges(state, change), initialState),
distinctUntilChanged(),
switchMap(state => saveToServer(state))
).subscribe(savedState => updateUI(savedState));

🔑 Essential Operator Categories Summary

Essential Operator Categories:

  • Creation: of, from, interval, timer
  • Filtering: filter, take, takeUntil, distinctUntilChanged
  • Transformation: map, scan, reduce, pluck
  • Time-based: debounceTime, throttleTime, delay
  • Combination: startWith, zip, withLatestFrom
  • Error handling: catchError, retry, retryWhen
  • Utility: tap, finalize, share, shareReplay

Performance Tips:

// Use shareReplay for expensive operations
const expensiveData$ = http.get('/api/expensive').pipe(
shareReplay(1)
);

// Use debounceTime for user input
searchInput$.pipe(
debounceTime(300),
distinctUntilChanged()
);

// Use takeUntil for cleanup
observable$.pipe(
takeUntil(this.destroy$)
);

// Use finalize for cleanup actions
request$.pipe(
finalize(() => hideSpinner())
);

Common Patterns:

// Search pattern
searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(query => query.length >= 2),
switchMap(query => searchAPI(query)),
catchError(() => of([]))
);

// Auto-save pattern
formData$.pipe(
debounceTime(2000),
distinctUntilChanged(),
switchMap(data => saveData(data)),
retry(3),
catchError(error => of({ error: true }))
);

// Cleanup pattern
dataStream$.pipe(
takeUntil(this.destroy$),
finalize(() => console.log('Stream cleanup complete'))
);

RxJS operators provide powerful tools for handling complex asynchronous scenarios elegantly! 🚀