Skip to main content

RxJS Guide: Observables, Subjects, and Higher-Order Operators Explained

πŸ” What is RxJS?​

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asynchronous or callback-based code.

// Think of it as handling streams of data over time
mouse clicks β†’ [click, click, click, ...]
API responses β†’ [data1, data2, data3, ...]
user input β†’ [a, ab, abc, abcd, ...]

πŸ“‘ Observable Basics​

What is an Observable?​

An Observable is a stream of data that can emit multiple values over time.

import { Observable } from 'rxjs';

// Creating an Observable
const dataStream$ = new Observable(observer => {
observer.next('Hello'); // Emit first value
observer.next('World'); // Emit second value
observer.complete(); // Complete the stream
});

// Subscribing to receive data
dataStream$.subscribe({
next: value => console.log(value), // 'Hello', then 'World'
complete: () => console.log('Done!') // 'Done!'
});

Subscription Explained​

A subscription is like signing up for a newsletter - you'll receive updates until you unsubscribe.

const subscription = dataStream$.subscribe(data => {
console.log('Received:', data);
});

// Later... stop receiving updates
subscription.unsubscribe(); // Important to prevent memory leaks!

🎯 Subject vs BehaviorSubject vs Other Subjects​

Subject - Basic Event Emitter​

A Subject is like a radio station - it broadcasts to all listeners, but if you tune in late, you miss earlier broadcasts.

import { Subject } from 'rxjs';

const subject$ = new Subject();

// Subscribe first listener
subject$.subscribe(data => console.log('Listener 1:', data));

subject$.next('Message 1'); // Listener 1: Message 1

// Subscribe second listener (misses Message 1)
subject$.subscribe(data => console.log('Listener 2:', data));

subject$.next('Message 2');
// Listener 1: Message 2
// Listener 2: Message 2

BehaviorSubject - Remembers Last Value​

BehaviorSubject is like a TV channel with replay - new viewers see the last broadcast immediately.

import { BehaviorSubject } from 'rxjs';

const behaviorSubject$ = new BehaviorSubject('Initial Value');

behaviorSubject$.subscribe(data => console.log('Listener 1:', data));
// Listener 1: Initial Value (immediately gets current value)

behaviorSubject$.next('Message 1');
// Listener 1: Message 1

// New subscriber gets last value immediately
behaviorSubject$.subscribe(data => console.log('Listener 2:', data));
// Listener 2: Message 1 (gets the last emitted value)

behaviorSubject$.next('Message 2');
// Listener 1: Message 2
// Listener 2: Message 2

ReplaySubject - Remembers Multiple Values​

ReplaySubject is like DVR - it records the last N broadcasts for new viewers.

import { ReplaySubject } from 'rxjs';

const replaySubject$ = new ReplaySubject(2); // Remember last 2 values

replaySubject$.next('Message 1');
replaySubject$.next('Message 2');
replaySubject$.next('Message 3');

// New subscriber gets last 2 values
replaySubject$.subscribe(data => console.log('Listener:', data));
// Listener: Message 2
// Listener: Message 3

AsyncSubject - Only Last Value When Complete​

AsyncSubject is like final exam results - you only get the score after the exam is finished.

import { AsyncSubject } from 'rxjs';

const asyncSubject$ = new AsyncSubject();

asyncSubject$.subscribe(data => console.log('Result:', data));

asyncSubject$.next('Draft 1');
asyncSubject$.next('Draft 2');
asyncSubject$.next('Final Result'); // This will be delivered
asyncSubject$.complete(); // Only now subscriber gets 'Final Result'

πŸ—ΊοΈ RxJS Map Operator - How It Works​

Basic map() Operator​

The map() operator transforms each emitted value using a provided function.

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

// Simple transformation
of(1, 2, 3).pipe(
map(x => x * 2) // Transform each value
).subscribe(console.log);
// Output: 2, 4, 6

// Object transformation
of({ name: 'John' }, { name: 'Jane' }).pipe(
map(user => user.name.toUpperCase())
).subscribe(console.log);
// Output: 'JOHN', 'JANE'

Visual Representation​

Source:    1----2----3----|
map(x => x * 2)
Result: 2----4----6----|

πŸ”„ Higher-Order Observables & Observable Switching​

What are Higher-Order Observables?​

A Higher-Order Observable is an Observable that emits other Observables - like boxes containing more boxes.

import { fromEvent, interval } from 'rxjs';
import { map } from 'rxjs/operators';

// Higher-order Observable example
const clicks$ = fromEvent(button, 'click');
const higherOrder$ = clicks$.pipe(
map(click => interval(1000)) // Each click creates a new interval Observable
);

// higherOrder$ emits Observables, not numbers!
higherOrder$.subscribe(innerObservable$ => {
console.log('Got a new Observable:', innerObservable$);
// You'd need to subscribe to innerObservable$ to get actual values
});

The Problem: Observable Switching​

When you have a higher-order Observable, you need to flatten it to get the actual values.

// Without flattening - you get Observables
const searchInput$ = fromEvent(searchBox, 'input');
const requests$ = searchInput$.pipe(
map(event => http.get(`/api/search?q=${event.target.value}`))
);
// requests$ emits HTTP Observables, not data!

// With flattening - you get actual data
const results$ = searchInput$.pipe(
switchMap(event => http.get(`/api/search?q=${event.target.value}`))
);
// results$ emits actual search results!

πŸ—ΊοΈ Higher-Order Mapping Operators Explained​

switchMap - Switch to Latest (Cancellation Strategy)​

Strategy: Cancel previous inner Observable when new one arrives.

import { switchMap } from 'rxjs/operators';

// Search autocomplete example
searchInput$.pipe(
switchMap(query => http.get(`/api/search?q=${query}`))
).subscribe(results => console.log(results));

// Timeline visualization:
// Input: a----ab----abc----|
// Requests: -req1-req2-req3---|
// Results: -----X----res3----| (req1 & req2 cancelled)

When to use:

  • Search autocomplete (only latest search matters)
  • Navigation (cancel previous route requests)
  • Real-time data updates
// Real example: Auto-save document
documentChanges$.pipe(
debounceTime(2000),
switchMap(doc => saveDocument(doc)) // Cancel previous save if user keeps typing
).subscribe(result => console.log('Saved:', result));

mergeMap - Handle All Concurrently (Merge Strategy)​

Strategy: Keep all inner Observables active, merge their emissions.

import { mergeMap } from 'rxjs/operators';

// Multiple file uploads
fileQueue$.pipe(
mergeMap(file => uploadFile(file)) // All uploads happen simultaneously
).subscribe(result => console.log('Upload complete:', result));

// Timeline visualization:
// Input: a--b--c----|
// Requests: -req1-req2-req3
// Results: --res1-res2-res3 (all complete independently)

When to use:

  • Independent parallel operations
  • File uploads
  • Analytics tracking
  • Multiple API calls that don't interfere
// Real example: Process user actions
userActions$.pipe(
mergeMap(action => {
switch(action.type) {
case 'LIKE': return likePost(action.postId);
case 'SHARE': return sharePost(action.postId);
case 'COMMENT': return addComment(action.postId, action.comment);
}
})
).subscribe(result => updateUI(result));

concatMap - Queue One by One (Sequential Strategy)​

Strategy: Wait for previous inner Observable to complete before starting next.

import { concatMap } from 'rxjs/operators';

// Sequential API calls
queue$.pipe(
concatMap(task => processTask(task)) // Wait for each task to finish
).subscribe(result => console.log('Task complete:', result));

// Timeline visualization:
// Input: a--b--c----|
// Requests: -req1---req2---req3
// Results: ----res1---res2---res3 (sequential order maintained)

When to use:

  • Order matters
  • Sequential operations
  • Rate-limited APIs
  • Database transactions
// Real example: Sequential file processing
fileList$.pipe(
concatMap(file =>
processFile(file).pipe(
tap(result => updateProgress(result))
)
)
).subscribe(result => console.log('All files processed'));

exhaustMap - Ignore While Busy (Ignore Strategy)​

Strategy: Ignore new emissions while inner Observable is active.

import { exhaustMap } from 'rxjs/operators';

// Prevent double-clicks
loginButton$.pipe(
exhaustMap(() => loginUser()) // Ignore clicks while login is in progress
).subscribe(user => console.log('Logged in:', user));

// Timeline visualization:
// Input: a--b--c----|
// Requests: -req1---X--X (b & c ignored while req1 is active)
// Results: ----res1-------

When to use:

  • Prevent duplicate operations
  • Rate limiting
  • Form submissions
  • Resource-intensive operations
// Real example: Refresh data with cooldown
refreshButton$.pipe(
exhaustMap(() =>
loadData().pipe(
finalize(() => console.log('Refresh cooldown active'))
)
)
).subscribe(data => updateView(data));

🎯 How to Choose the Right Mapping Operator​

Decision Tree​

Do you need inner Observables to complete in order?
β”œβ”€ YES β†’ concatMap (sequential)
└─ NO
β”œβ”€ Do you only care about the latest result?
β”‚ β”œβ”€ YES β†’ switchMap (cancel previous)
β”‚ └─ NO
β”‚ β”œβ”€ Do you want to ignore new requests while one is active?
β”‚ β”‚ β”œβ”€ YES β†’ exhaustMap (ignore while busy)
β”‚ β”‚ └─ NO β†’ mergeMap (handle all concurrently)

Comparison Table​

OperatorStrategyUse CaseExample
switchMapCancel previousLatest mattersSearch, Navigation
mergeMapKeep all activeIndependent operationsFile uploads, Analytics
concatMapOne at a timeOrder mattersSequential processing
exhaustMapIgnore while busyPrevent duplicatesButton clicks, Refresh

Performance Considerations​

// Memory efficient - cancels previous requests
searchInput$.pipe(
switchMap(query => searchAPI(query)) // βœ… Good for search
);

// Can cause memory issues with many concurrent requests
clicks$.pipe(
mergeMap(() => heavyOperation()) // ⚠️ Be careful with resource-intensive ops
);

// Slower but maintains order and prevents overload
taskQueue$.pipe(
concatMap(task => processTask(task)) // βœ… Good for sequential work
);

// Prevents spam but might miss legitimate requests
submitButton$.pipe(
exhaustMap(() => submitForm()) // βœ… Good for preventing double-submission
);

πŸ”€ Utility Operators​

forkJoin - Wait for All​

Like waiting for all friends before starting a group activity.

import { forkJoin } from 'rxjs';

forkJoin({
user: http.get('/api/user'),
posts: http.get('/api/posts'),
comments: http.get('/api/comments')
}).subscribe(({ user, posts, comments }) => {
console.log('All data loaded:', { user, posts, comments });
});

// Waits for ALL requests to complete
// Fails if ANY request fails

Use Case: Loading multiple independent resources

combineLatest - Combine Latest Values​

Like a scoreboard - shows latest score from each team.

import { combineLatest } from 'rxjs';

combineLatest([
temperature$,
humidity$,
pressure$
]).subscribe(([temp, humidity, pressure]) => {
console.log(`Weather: ${temp}Β°C, ${humidity}%, ${pressure}hPa`);
});

// Emits whenever ANY source emits
// Always uses the latest value from each source

🎯 Real-World Examples​

Search with Debounce (switchMap)​

import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

searchInput$.pipe(
debounceTime(300), // Wait 300ms after user stops typing
distinctUntilChanged(), // Only if search term changed
switchMap(query =>
query ? http.get(`/api/search?q=${query}`) : of([])
)
).subscribe(results => displayResults(results));

Auto-save with Error Handling (switchMap)​

import { debounceTime, switchMap, catchError, retry } from 'rxjs/operators';

formChanges$.pipe(
debounceTime(2000), // Auto-save 2 seconds after changes
switchMap(formData =>
http.put('/api/save', formData).pipe(
retry(3), // Retry failed requests 3 times
catchError(error => {
console.error('Save failed:', error);
return of(null); // Return null on error
})
)
)
).subscribe(result => {
if (result) console.log('Saved successfully');
});

Loading Multiple Resources (forkJoin)​

import { forkJoin, map } from 'rxjs';

// Load user profile page data
forkJoin({
profile: http.get('/api/user/profile'),
posts: http.get('/api/user/posts'),
followers: http.get('/api/user/followers')
}).pipe(
map(({ profile, posts, followers }) => ({
...profile,
postsCount: posts.length,
followersCount: followers.length
}))
).subscribe(pageData => renderProfilePage(pageData));

File Upload Queue (concatMap)​

import { concatMap, tap } from 'rxjs/operators';

fileQueue$.pipe(
concatMap((file, index) =>
uploadFile(file).pipe(
tap(progress => updateProgressBar(index, progress))
)
)
).subscribe(
result => console.log('File uploaded:', result),
error => console.error('Upload failed:', error),
() => console.log('All files uploaded!')
);

πŸ”‘ Key Takeaways​

When to Use Each Subject:​

  • Subject: Basic event emitter
  • BehaviorSubject: Need current state (user login status, shopping cart)
  • ReplaySubject: Need history (chat messages, notifications)
  • AsyncSubject: Only care about final result

When to Use Each Mapping Operator:​

  • map: Simple value transformation
  • switchMap: Latest value matters (search, navigation)
  • mergeMap: All requests matter (analytics, logging, parallel operations)
  • concatMap: Order matters (sequential uploads, queue processing)
  • exhaustMap: Prevent duplicates (button clicks, form submissions)

Higher-Order Observable Strategy:​

  1. Identify if your Observable emits other Observables
  2. Choose the right flattening strategy based on your needs
  3. Consider performance and memory implications
  4. Test with multiple rapid emissions to verify behavior

Memory Management:​

// Always unsubscribe to prevent memory leaks
const subscription = observable$.subscribe(data => console.log(data));

// In component cleanup
ngOnDestroy() {
subscription.unsubscribe();
}

// Or use takeUntil pattern
private destroy$ = new Subject();

ngOnInit() {
observable$.pipe(
takeUntil(this.destroy$)
).subscribe(data => console.log(data));
}

ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}

RxJS makes handling asynchronous data streams elegant and powerful! πŸš€