RxJS Guide: Observables, Subjects, and Higher-Order Operators Explained
π What is RxJS?β
RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asynchronous or callback-based code.
// Think of it as handling streams of data over time
mouse clicks β [click, click, click, ...]
API responses β [data1, data2, data3, ...]
user input β [a, ab, abc, abcd, ...]
π‘ Observable Basicsβ
What is an Observable?β
An Observable is a stream of data that can emit multiple values over time.
import { Observable } from 'rxjs';
// Creating an Observable
const dataStream$ = new Observable(observer => {
observer.next('Hello'); // Emit first value
observer.next('World'); // Emit second value
observer.complete(); // Complete the stream
});
// Subscribing to receive data
dataStream$.subscribe({
next: value => console.log(value), // 'Hello', then 'World'
complete: () => console.log('Done!') // 'Done!'
});
Subscription Explainedβ
A subscription is like signing up for a newsletter - you'll receive updates until you unsubscribe.
const subscription = dataStream$.subscribe(data => {
console.log('Received:', data);
});
// Later... stop receiving updates
subscription.unsubscribe(); // Important to prevent memory leaks!
π― Subject vs BehaviorSubject vs Other Subjectsβ
Subject - Basic Event Emitterβ
A Subject is like a radio station - it broadcasts to all listeners, but if you tune in late, you miss earlier broadcasts.
import { Subject } from 'rxjs';
const subject$ = new Subject();
// Subscribe first listener
subject$.subscribe(data => console.log('Listener 1:', data));
subject$.next('Message 1'); // Listener 1: Message 1
// Subscribe second listener (misses Message 1)
subject$.subscribe(data => console.log('Listener 2:', data));
subject$.next('Message 2');
// Listener 1: Message 2
// Listener 2: Message 2
BehaviorSubject - Remembers Last Valueβ
BehaviorSubject is like a TV channel with replay - new viewers see the last broadcast immediately.
import { BehaviorSubject } from 'rxjs';
const behaviorSubject$ = new BehaviorSubject('Initial Value');
behaviorSubject$.subscribe(data => console.log('Listener 1:', data));
// Listener 1: Initial Value (immediately gets current value)
behaviorSubject$.next('Message 1');
// Listener 1: Message 1
// New subscriber gets last value immediately
behaviorSubject$.subscribe(data => console.log('Listener 2:', data));
// Listener 2: Message 1 (gets the last emitted value)
behaviorSubject$.next('Message 2');
// Listener 1: Message 2
// Listener 2: Message 2
ReplaySubject - Remembers Multiple Valuesβ
ReplaySubject is like DVR - it records the last N broadcasts for new viewers.
import { ReplaySubject } from 'rxjs';
const replaySubject$ = new ReplaySubject(2); // Remember last 2 values
replaySubject$.next('Message 1');
replaySubject$.next('Message 2');
replaySubject$.next('Message 3');
// New subscriber gets last 2 values
replaySubject$.subscribe(data => console.log('Listener:', data));
// Listener: Message 2
// Listener: Message 3
AsyncSubject - Only Last Value When Completeβ
AsyncSubject is like final exam results - you only get the score after the exam is finished.
import { AsyncSubject } from 'rxjs';
const asyncSubject$ = new AsyncSubject();
asyncSubject$.subscribe(data => console.log('Result:', data));
asyncSubject$.next('Draft 1');
asyncSubject$.next('Draft 2');
asyncSubject$.next('Final Result'); // This will be delivered
asyncSubject$.complete(); // Only now subscriber gets 'Final Result'
πΊοΈ RxJS Map Operator - How It Worksβ
Basic map() Operatorβ
The map() operator transforms each emitted value using a provided function.
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// Simple transformation
of(1, 2, 3).pipe(
map(x => x * 2) // Transform each value
).subscribe(console.log);
// Output: 2, 4, 6
// Object transformation
of({ name: 'John' }, { name: 'Jane' }).pipe(
map(user => user.name.toUpperCase())
).subscribe(console.log);
// Output: 'JOHN', 'JANE'
Visual Representationβ
Source: 1----2----3----|
map(x => x * 2)
Result: 2----4----6----|
π Higher-Order Observables & Observable Switchingβ
What are Higher-Order Observables?β
A Higher-Order Observable is an Observable that emits other Observables - like boxes containing more boxes.
import { fromEvent, interval } from 'rxjs';
import { map } from 'rxjs/operators';
// Higher-order Observable example
const clicks$ = fromEvent(button, 'click');
const higherOrder$ = clicks$.pipe(
map(click => interval(1000)) // Each click creates a new interval Observable
);
// higherOrder$ emits Observables, not numbers!
higherOrder$.subscribe(innerObservable$ => {
console.log('Got a new Observable:', innerObservable$);
// You'd need to subscribe to innerObservable$ to get actual values
});
The Problem: Observable Switchingβ
When you have a higher-order Observable, you need to flatten it to get the actual values.
// Without flattening - you get Observables
const searchInput$ = fromEvent(searchBox, 'input');
const requests$ = searchInput$.pipe(
map(event => http.get(`/api/search?q=${event.target.value}`))
);
// requests$ emits HTTP Observables, not data!
// With flattening - you get actual data
const results$ = searchInput$.pipe(
switchMap(event => http.get(`/api/search?q=${event.target.value}`))
);
// results$ emits actual search results!
πΊοΈ Higher-Order Mapping Operators Explainedβ
switchMap - Switch to Latest (Cancellation Strategy)β
Strategy: Cancel previous inner Observable when new one arrives.
import { switchMap } from 'rxjs/operators';
// Search autocomplete example
searchInput$.pipe(
switchMap(query => http.get(`/api/search?q=${query}`))
).subscribe(results => console.log(results));
// Timeline visualization:
// Input: a----ab----abc----|
// Requests: -req1-req2-req3---|
// Results: -----X----res3----| (req1 & req2 cancelled)
When to use:
- Search autocomplete (only latest search matters)
- Navigation (cancel previous route requests)
- Real-time data updates
// Real example: Auto-save document
documentChanges$.pipe(
debounceTime(2000),
switchMap(doc => saveDocument(doc)) // Cancel previous save if user keeps typing
).subscribe(result => console.log('Saved:', result));
mergeMap - Handle All Concurrently (Merge Strategy)β
Strategy: Keep all inner Observables active, merge their emissions.
import { mergeMap } from 'rxjs/operators';
// Multiple file uploads
fileQueue$.pipe(
mergeMap(file => uploadFile(file)) // All uploads happen simultaneously
).subscribe(result => console.log('Upload complete:', result));
// Timeline visualization:
// Input: a--b--c----|
// Requests: -req1-req2-req3
// Results: --res1-res2-res3 (all complete independently)
When to use:
- Independent parallel operations
- File uploads
- Analytics tracking
- Multiple API calls that don't interfere
// Real example: Process user actions
userActions$.pipe(
mergeMap(action => {
switch(action.type) {
case 'LIKE': return likePost(action.postId);
case 'SHARE': return sharePost(action.postId);
case 'COMMENT': return addComment(action.postId, action.comment);
}
})
).subscribe(result => updateUI(result));
concatMap - Queue One by One (Sequential Strategy)β
Strategy: Wait for previous inner Observable to complete before starting next.
import { concatMap } from 'rxjs/operators';
// Sequential API calls
queue$.pipe(
concatMap(task => processTask(task)) // Wait for each task to finish
).subscribe(result => console.log('Task complete:', result));
// Timeline visualization:
// Input: a--b--c----|
// Requests: -req1---req2---req3
// Results: ----res1---res2---res3 (sequential order maintained)
When to use:
- Order matters
- Sequential operations
- Rate-limited APIs
- Database transactions
// Real example: Sequential file processing
fileList$.pipe(
concatMap(file =>
processFile(file).pipe(
tap(result => updateProgress(result))
)
)
).subscribe(result => console.log('All files processed'));
exhaustMap - Ignore While Busy (Ignore Strategy)β
Strategy: Ignore new emissions while inner Observable is active.
import { exhaustMap } from 'rxjs/operators';
// Prevent double-clicks
loginButton$.pipe(
exhaustMap(() => loginUser()) // Ignore clicks while login is in progress
).subscribe(user => console.log('Logged in:', user));
// Timeline visualization:
// Input: a--b--c----|
// Requests: -req1---X--X (b & c ignored while req1 is active)
// Results: ----res1-------
When to use:
- Prevent duplicate operations
- Rate limiting
- Form submissions
- Resource-intensive operations
// Real example: Refresh data with cooldown
refreshButton$.pipe(
exhaustMap(() =>
loadData().pipe(
finalize(() => console.log('Refresh cooldown active'))
)
)
).subscribe(data => updateView(data));
π― How to Choose the Right Mapping Operatorβ
Decision Treeβ
Do you need inner Observables to complete in order?
ββ YES β concatMap (sequential)
ββ NO
ββ Do you only care about the latest result?
β ββ YES β switchMap (cancel previous)
β ββ NO
β ββ Do you want to ignore new requests while one is active?
β β ββ YES β exhaustMap (ignore while busy)
β β ββ NO β mergeMap (handle all concurrently)
Comparison Tableβ
| Operator | Strategy | Use Case | Example |
|---|---|---|---|
| switchMap | Cancel previous | Latest matters | Search, Navigation |
| mergeMap | Keep all active | Independent operations | File uploads, Analytics |
| concatMap | One at a time | Order matters | Sequential processing |
| exhaustMap | Ignore while busy | Prevent duplicates | Button clicks, Refresh |
Performance Considerationsβ
// Memory efficient - cancels previous requests
searchInput$.pipe(
switchMap(query => searchAPI(query)) // β
Good for search
);
// Can cause memory issues with many concurrent requests
clicks$.pipe(
mergeMap(() => heavyOperation()) // β οΈ Be careful with resource-intensive ops
);
// Slower but maintains order and prevents overload
taskQueue$.pipe(
concatMap(task => processTask(task)) // β
Good for sequential work
);
// Prevents spam but might miss legitimate requests
submitButton$.pipe(
exhaustMap(() => submitForm()) // β
Good for preventing double-submission
);
π Utility Operatorsβ
forkJoin - Wait for Allβ
Like waiting for all friends before starting a group activity.
import { forkJoin } from 'rxjs';
forkJoin({
user: http.get('/api/user'),
posts: http.get('/api/posts'),
comments: http.get('/api/comments')
}).subscribe(({ user, posts, comments }) => {
console.log('All data loaded:', { user, posts, comments });
});
// Waits for ALL requests to complete
// Fails if ANY request fails
Use Case: Loading multiple independent resources
combineLatest - Combine Latest Valuesβ
Like a scoreboard - shows latest score from each team.
import { combineLatest } from 'rxjs';
combineLatest([
temperature$,
humidity$,
pressure$
]).subscribe(([temp, humidity, pressure]) => {
console.log(`Weather: ${temp}Β°C, ${humidity}%, ${pressure}hPa`);
});
// Emits whenever ANY source emits
// Always uses the latest value from each source
π― Real-World Examplesβ
Search with Debounce (switchMap)β
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
searchInput$.pipe(
debounceTime(300), // Wait 300ms after user stops typing
distinctUntilChanged(), // Only if search term changed
switchMap(query =>
query ? http.get(`/api/search?q=${query}`) : of([])
)
).subscribe(results => displayResults(results));
Auto-save with Error Handling (switchMap)β
import { debounceTime, switchMap, catchError, retry } from 'rxjs/operators';
formChanges$.pipe(
debounceTime(2000), // Auto-save 2 seconds after changes
switchMap(formData =>
http.put('/api/save', formData).pipe(
retry(3), // Retry failed requests 3 times
catchError(error => {
console.error('Save failed:', error);
return of(null); // Return null on error
})
)
)
).subscribe(result => {
if (result) console.log('Saved successfully');
});
Loading Multiple Resources (forkJoin)β
import { forkJoin, map } from 'rxjs';
// Load user profile page data
forkJoin({
profile: http.get('/api/user/profile'),
posts: http.get('/api/user/posts'),
followers: http.get('/api/user/followers')
}).pipe(
map(({ profile, posts, followers }) => ({
...profile,
postsCount: posts.length,
followersCount: followers.length
}))
).subscribe(pageData => renderProfilePage(pageData));
File Upload Queue (concatMap)β
import { concatMap, tap } from 'rxjs/operators';
fileQueue$.pipe(
concatMap((file, index) =>
uploadFile(file).pipe(
tap(progress => updateProgressBar(index, progress))
)
)
).subscribe(
result => console.log('File uploaded:', result),
error => console.error('Upload failed:', error),
() => console.log('All files uploaded!')
);
π Key Takeawaysβ
When to Use Each Subject:β
- Subject: Basic event emitter
- BehaviorSubject: Need current state (user login status, shopping cart)
- ReplaySubject: Need history (chat messages, notifications)
- AsyncSubject: Only care about final result
When to Use Each Mapping Operator:β
- map: Simple value transformation
- switchMap: Latest value matters (search, navigation)
- mergeMap: All requests matter (analytics, logging, parallel operations)
- concatMap: Order matters (sequential uploads, queue processing)
- exhaustMap: Prevent duplicates (button clicks, form submissions)
Higher-Order Observable Strategy:β
- Identify if your Observable emits other Observables
- Choose the right flattening strategy based on your needs
- Consider performance and memory implications
- Test with multiple rapid emissions to verify behavior
Memory Management:β
// Always unsubscribe to prevent memory leaks
const subscription = observable$.subscribe(data => console.log(data));
// In component cleanup
ngOnDestroy() {
subscription.unsubscribe();
}
// Or use takeUntil pattern
private destroy$ = new Subject();
ngOnInit() {
observable$.pipe(
takeUntil(this.destroy$)
).subscribe(data => console.log(data));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
RxJS makes handling asynchronous data streams elegant and powerful! π