RxJs Subject Complete Guide: Observable, Observer, and Subject Types
📡 Observable, Observer & Observer Pattern in RxJS
Observable - The Data Producer
An Observable is a data producer that can emit multiple values over time. Think of it as a stream or pipe through which data flows.
import { Observable } from 'rxjs';
// Observable is like a recipe - it defines what will happen
const dataStream$ = new Observable(observer => {
observer.next('First value'); // Emit value
observer.next('Second value'); // Emit another value
observer.complete(); // Signal completion
});
// Nothing happens until someone subscribes!
console.log('Observable created, but no data emitted yet');
Observer - The Data Consumer
An Observer is an object that knows how to handle the values emitted by an Observable. It has three methods:
const observer = {
next: value => console.log('Received:', value), // Handle emitted values
error: error => console.error('Error occurred:', error), // Handle errors
complete: () => console.log('Stream completed') // Handle completion
};
// Now the Observable starts emitting
dataStream$.subscribe(observer);
// Output:
// Received: First value
// Received: Second value
// Stream completed
Observer Pattern Explained
The Observer Pattern is a design pattern where:
- Subject (Observable) maintains a list of observers
- Observers register to receive notifications
- Subject notifies all observers when something happens
// Observable acts as the Subject
// Subscribers act as Observers
const subject$ = new Observable(observer => {
let count = 0;
const interval = setInterval(() => {
observer.next(count++);
if (count > 3) {
observer.complete();
clearInterval(interval);
}
}, 1000);
});
// Multiple observers can subscribe
subject$.subscribe(value => console.log('Observer 1:', value));
subject$.subscribe(value => console.log('Observer 2:', value));
🆚 Observable vs Promise: Key Differences
1. Single vs Multiple Values
// Promise - Single value
const promise = fetch('/api/user')
.then(response => response.json());
promise.then(user => console.log(user)); // Gets one user object
// Observable - Multiple values over time
const observable$ = new Observable(observer => {
observer.next('Value 1');
observer.next('Value 2');
observer.next('Value 3');
observer.complete();
});
observable$.subscribe(value => console.log(value));
// Output: Value 1, Value 2, Value 3
2. Eager vs Lazy Execution
// Promise - Eager (executes immediately)
console.log('Creating promise...');
const promise = new Promise(resolve => {
console.log('Promise executor running!'); // Runs immediately
resolve('Done');
});
console.log('Promise created');
// Output: Creating promise... → Promise executor running! → Promise created
// Observable - Lazy (executes only when subscribed)
console.log('Creating observable...');
const observable$ = new Observable(observer => {
console.log('Observable executor running!'); // Only runs when subscribed
observer.next('Done');
});
console.log('Observable created');
observable$.subscribe(); // NOW the executor runs
// Output: Creating observable... → Observable created → Observable executor running!
3. Cancellation Support
// Promise - Cannot be cancelled
const promise = fetch('/api/slow-endpoint');
// No way to cancel this request
// Observable - Can be cancelled
const request$ = new Observable(observer => {
const controller = new AbortController();
fetch('/api/slow-endpoint', { signal: controller.signal })
.then(response => observer.next(response))
.catch(error => observer.error(error));
// Return cleanup function
return () => controller.abort();
});
const subscription = request$.subscribe();
subscription.unsubscribe(); // Cancels the request!
Comparison Table
| Feature | Promise | Observable |
|---|---|---|
| Values | Single | Multiple over time |
| Execution | Eager | Lazy |
| Cancellation | ❌ | ✅ |
| Retry | Manual | Built-in operators |
| Operators | Limited | Rich ecosystem |
| Error Recovery | Basic | Advanced |
| Time-based | ❌ | ✅ (debounce, throttle) |
🎯 Subject vs BehaviorSubject: Main Differences
Subject - Basic Multicast Observable
import { Subject } from 'rxjs';
const subject$ = new Subject();
// Subscribe first observer
subject$.subscribe(value => console.log('Observer 1:', value));
subject$.next('Message 1'); // Observer 1: Message 1
// Subscribe second observer (missed Message 1)
subject$.subscribe(value => console.log('Observer 2:', value));
subject$.next('Message 2');
// Observer 1: Message 2
// Observer 2: Message 2
subject$.next('Message 3');
// Observer 1: Message 3
// Observer 2: Message 3
Key Characteristics of Subject:
- ❌ No initial value
- ❌ No current value storage
- ❌ Late subscribers miss previous emissions
- ✅ Multicasts to multiple observers
BehaviorSubject - Subject with Current Value
import { BehaviorSubject } from 'rxjs';
// Requires initial value
const behaviorSubject$ = new BehaviorSubject('Initial Value');
// Subscribe first observer
behaviorSubject$.subscribe(value => console.log('Observer 1:', value));
// Observer 1: Initial Value (immediately receives current value)
behaviorSubject$.next('Message 1');
// Observer 1: Message 1
// Subscribe second observer (gets current value immediately)
behaviorSubject$.subscribe(value => console.log('Observer 2:', value));
// Observer 2: Message 1 (gets the last emitted value)
behaviorSubject$.next('Message 2');
// Observer 1: Message 2
// Observer 2: Message 2
// Access current value directly
console.log('Current value:', behaviorSubject$.getValue()); // Message 2
Key Characteristics of BehaviorSubject:
- ✅ Requires initial value
- ✅ Stores current value
- ✅ Late subscribers get current value immediately
- ✅ Has
getValue()method - ✅ Multicasts to multiple observers
🛠️ Real-World Angular Examples
1. Debounced Input Search with Subject
How to do logical operations after user stops typing in input field:
- Advanced
- Basic
// 1. Service Method Returns Observable<User[]>
@Injectable()
export class UserService {
searchUsers(searchTerm: string): Observable<User[]> {
// ^^^^^^^^^ This is an array of User objects
return this.http.get<User[]>(`/api/users/search?q=${searchTerm}`);
}
}
/*
// API Response (what the server sends)
{
"users": [
{ "id": 1, "name": "John Doe", "email": "john@example.com" },
{ "id": 2, "name": "Jane Smith", "email": "jane@example.com" }
]
}
*/
// 2. Component Receives User[] as 'results'
import { OnInit, Component } from '@angular/core';
import { Subject } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
@Component({
selector: 'search-component',
template: `
<input
[ngModel]='inputTyped'
(ngModelChange)='onInputChanged($event)'
placeholder="Search users..."
/>
<div *ngFor="let result of searchResults">{{result.name}}</div>
`
})
export class SearchComponent implements OnInit {
inputTyped: string;
searchResults: any[] = [];
// Subject for handling input changes
private inputChanged: Subject<string> = new Subject<string>();
constructor(private userService: UserService) {}
ngOnInit() {
this.inputChanged.pipe(
debounceTime(500), // Wait 500ms after user stops typing
distinctUntilChanged(), // Only search if search term changed
switchMap(searchTerm =>
this.userService.searchUsers(searchTerm) // Returns Observable<User[]>
)
).subscribe(results => {
// ^^^^^^^ This 'results' is the User[] array
this.searchResults = results; // Assigning User[] to component property
});
}
onInputChanged(text: string) {
this.inputTyped = text;
this.inputChanged.next(text); // Emit the search term
}
ngOnDestroy() {
this.inputChanged.complete(); // Clean up
}
}
import { OnInit, Component } from '@angular/core';
import { Subject } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
@Component({
selector: 'my-app',
template: `<input [ngModel]='inputTyped' (ngModelChange)='onInputChanged($event)' />`
})
export class AppComponent implements OnInit {
inputTyped: string;
inputChanged: Subject<string> = new Subject<string>();
constructor() {}
ngOnInit() {
this.inputChanged.pipe(
debounceTime(500))
.subscribe(typedText => {
console.log(typedText);
// Do any logical things
});
}
onInputChanged(text: string) {
this.inputChanged.next(text);
}
}
🔍 Why switchMap is Used in Search Implementation
The switchMap operator is crucial in the search implementation for several important reasons:
1. Cancellation of Previous Requests
// Without switchMap - PROBLEMATIC
this.inputChanged.pipe(
debounceTime(500),
distinctUntilChanged(),
map(searchTerm => this.userService.searchUsers(searchTerm)) // ❌ Returns Observable
).subscribe(observable => {
// You get an Observable, not the actual results!
observable.subscribe(results => this.searchResults = results);
});
// With switchMap - CORRECT
this.inputChanged.pipe(
debounceTime(500),
distinctUntilChanged(),
switchMap(searchTerm => this.userService.searchUsers(searchTerm)) // ✅ Flattens Observable
).subscribe(results => {
this.searchResults = results; // You get actual results
});
2. Race Condition Prevention
The Problem Without switchMap:
// User types: "j" → "jo" → "john"
// Timeline:
// t0: User types "j" → API call 1 starts (takes 2 seconds)
// t1: User types "jo" → API call 2 starts (takes 1 second)
// t2: User types "john" → API call 3 starts (takes 0.5 seconds)
//
// Results arrive:
// t2.5: API call 3 completes → Shows results for "john" ✅
// t3: API call 2 completes → Shows results for "jo" ❌ WRONG!
// t4: API call 1 completes → Shows results for "j" ❌ WRONG!
The Solution With switchMap:
// User types: "j" → "jo" → "john"
// Timeline:
// t0: User types "j" → API call 1 starts
// t1: User types "jo" → API call 1 CANCELLED, API call 2 starts
// t2: User types "john" → API call 2 CANCELLED, API call 3 starts
//
// Results arrive:
// t2.5: API call 3 completes → Shows results for "john" ✅ CORRECT!
// (API calls 1 & 2 were cancelled, so no conflicting results)
3. Visual Timeline Comparison
// Input stream: j----jo----john----|
// Without switchMap:
// API calls: call1-call2-call3
// Results: ----res3--res2-res1 ❌ Wrong order!
// With switchMap:
// API calls: call1 X call2 X call3
// Results: ----------res3------- ✅ Only latest!