RxJs Subject Complete Guide: Observable, Observer, and Subject Types
📡 Observable, Observer & Observer Pattern in RxJS
Observable - The Data Producer
An Observable is a data producer that can emit multiple values over time. Think of it as a stream or pipe through which data flows.
import { Observable } from 'rxjs';
// Observable is like a recipe - it defines what will happen
const dataStream$ = new Observable(observer => {
observer.next('First value'); // Emit value
observer.next('Second value'); // Emit another value
observer.complete(); // Signal completion
});
// Nothing happens until someone subscribes!
console.log('Observable created, but no data emitted yet');
Observer - The Data Consumer
An Observer is an object that knows how to handle the values emitted by an Observable. It has three methods:
const observer = {
next: value => console.log('Received:', value), // Handle emitted values
error: error => console.error('Error occurred:', error), // Handle errors
complete: () => console.log('Stream completed') // Handle completion
};
// Now the Observable starts emitting
dataStream$.subscribe(observer);
// Output:
// Received: First value
// Received: Second value
// Stream completed
Observer Pattern Explained
The Observer Pattern is a design pattern where:
- Subject (Observable) maintains a list of observers
- Observers register to receive notifications
- Subject notifies all observers when something happens
// Observable acts as the Subject
// Subscribers act as Observers
const subject$ = new Observable(observer => {
let count = 0;
const interval = setInterval(() => {
observer.next(count++);
if (count > 3) {
observer.complete();
clearInterval(interval);
}
}, 1000);
});
// Multiple observers can subscribe
subject$.subscribe(value => console.log('Observer 1:', value));
subject$.subscribe(value => console.log('Observer 2:', value));
🆚 Observable vs Promise: Key Differences
1. Single vs Multiple Values
// Promise - Single value
const promise = fetch('/api/user')
.then(response => response.json());
promise.then(user => console.log(user)); // Gets one user object
// Observable - Multiple values over time
const observable$ = new Observable(observer => {
observer.next('Value 1');
observer.next('Value 2');
observer.next('Value 3');
observer.complete();
});
observable$.subscribe(value => console.log(value));
// Output: Value 1, Value 2, Value 3
2. Eager vs Lazy Execution
// Promise - Eager (executes immediately)
console.log('Creating promise...');
const promise = new Promise(resolve => {
console.log('Promise executor running!'); // Runs immediately
resolve('Done');
});
console.log('Promise created');
// Output: Creating promise... → Promise executor running! → Promise created
// Observable - Lazy (executes only when subscribed)
console.log('Creating observable...');
const observable$ = new Observable(observer => {
console.log('Observable executor running!'); // Only runs when subscribed
observer.next('Done');
});
console.log('Observable created');
observable$.subscribe(); // NOW the executor runs
// Output: Creating observable... → Observable created → Observable executor running!
3. Cancellation Support
// Promise - Cannot be cancelled
const promise = fetch('/api/slow-endpoint');
// No way to cancel this request
// Observable - Can be cancelled
const request$ = new Observable(observer => {
const controller = new AbortController();
fetch('/api/slow-endpoint', { signal: controller.signal })
.then(response => observer.next(response))
.catch(error => observer.error(error));
// Return cleanup function
return () => controller.abort();
});
const subscription = request$.subscribe();
subscription.unsubscribe(); // Cancels the request!
Comparison Table
| Feature | Promise | Observable |
|---|---|---|
| Values | Single | Multiple over time |
| Execution | Eager | Lazy |
| Cancellation | ❌ | ✅ |
| Retry | Manual | Built-in operators |
| Operators | Limited | Rich ecosystem |
| Error Recovery | Basic | Advanced |
| Time-based | ❌ | ✅ (debounce, throttle) |
🎯 Subject vs BehaviorSubject: Main Differences
Subject - Basic Multicast Observable
import { Subject } from 'rxjs';
const subject$ = new Subject();
// Subscribe first observer
subject$.subscribe(value => console.log('Observer 1:', value));
subject$.next('Message 1'); // Observer 1: Message 1
// Subscribe second observer (missed Message 1)
subject$.subscribe(value => console.log('Observer 2:', value));
subject$.next('Message 2');
// Observer 1: Message 2
// Observer 2: Message 2
subject$.next('Message 3');
// Observer 1: Message 3
// Observer 2: Message 3
Key Characteristics of Subject:
- ❌ No initial value
- ❌ No current value storage
- ❌ Late subscribers miss previous emissions
- ✅ Multicasts to multiple observers
BehaviorSubject - Subject with Current Value
import { BehaviorSubject } from 'rxjs';
// Requires initial value
const behaviorSubject$ = new BehaviorSubject('Initial Value');
// Subscribe first observer
behaviorSubject$.subscribe(value => console.log('Observer 1:', value));
// Observer 1: Initial Value (immediately receives current value)
behaviorSubject$.next('Message 1');
// Observer 1: Message 1
// Subscribe second observer (gets current value immediately)
behaviorSubject$.subscribe(value => console.log('Observer 2:', value));
// Observer 2: Message 1 (gets the last emitted value)
behaviorSubject$.next('Message 2');
// Observer 1: Message 2
// Observer 2: Message 2
// Access current value directly
console.log('Current value:', behaviorSubject$.getValue()); // Message 2
Key Characteristics of BehaviorSubject:
- ✅ Requires initial value
- ✅ Stores current value
- ✅ Late subscribers get current value immediately
- ✅ Has
getValue()method - ✅ Multicasts to multiple observers
🛠️ Real-World Angular Examples
1. Debounced Input Search with Subject
How to do logical operations after user stops typing in input field:
- Advanced
- Basic
// 1. Service Method Returns Observable<User[]>
@Injectable()
export class UserService {
searchUsers(searchTerm: string): Observable<User[]> {
// ^^^^^^^^^ This is an array of User objects
return this.http.get<User[]>(`/api/users/search?q=${searchTerm}`);
}
}
/*
// API Response (what the server sends)
{
"users": [
{ "id": 1, "name": "John Doe", "email": "john@example.com" },
{ "id": 2, "name": "Jane Smith", "email": "jane@example.com" }
]
}
*/
// 2. Component Receives User[] as 'results'
import { OnInit, Component } from '@angular/core';
import { Subject } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
@Component({
selector: 'search-component',
template: `
<input
[ngModel]='inputTyped'
(ngModelChange)='onInputChanged($event)'
placeholder="Search users..."
/>
<div *ngFor="let result of searchResults">{{result.name}}</div>
`
})
export class SearchComponent implements OnInit {
inputTyped: string;
searchResults: any[] = [];
// Subject for handling input changes
private inputChanged: Subject<string> = new Subject<string>();
constructor(private userService: UserService) {}
ngOnInit() {
this.inputChanged.pipe(
debounceTime(500), // Wait 500ms after user stops typing
distinctUntilChanged(), // Only search if search term changed
switchMap(searchTerm =>
this.userService.searchUsers(searchTerm) // Returns Observable<User[]>
)
).subscribe(results => {
// ^^^^^^^ This 'results' is the User[] array
this.searchResults = results; // Assigning User[] to component property
});
}
onInputChanged(text: string) {
this.inputTyped = text;
this.inputChanged.next(text); // Emit the search term
}
ngOnDestroy() {
this.inputChanged.complete(); // Clean up
}
}
import { OnInit, Component } from '@angular/core';
import { Subject } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
@Component({
selector: 'my-app',
template: `<input [ngModel]='inputTyped' (ngModelChange)='onInputChanged($event)' />`
})
export class AppComponent implements OnInit {
inputTyped: string;
inputChanged: Subject<string> = new Subject<string>();
constructor() {}
ngOnInit() {
this.inputChanged.pipe(
debounceTime(500))
.subscribe(typedText => {
console.log(typedText);
// Do any logical things
});
}
onInputChanged(text: string) {
this.inputChanged.next(text);
}
}
🔍 Why switchMap is Used in Search Implementation
The switchMap operator is crucial in the search implementation for several important reasons:
1. Cancellation of Previous Requests
// Without switchMap - PROBLEMATIC
this.inputChanged.pipe(
debounceTime(500),
distinctUntilChanged(),
map(searchTerm => this.userService.searchUsers(searchTerm)) // ❌ Returns Observable
).subscribe(observable => {
// You get an Observable, not the actual results!
observable.subscribe(results => this.searchResults = results);
});
// With switchMap - CORRECT
this.inputChanged.pipe(
debounceTime(500),
distinctUntilChanged(),
switchMap(searchTerm => this.userService.searchUsers(searchTerm)) // ✅ Flattens Observable
).subscribe(results => {
this.searchResults = results; // You get actual results
});
2. Race Condition Prevention
The Problem Without switchMap:
// User types: "j" → "jo" → "john"
// Timeline:
// t0: User types "j" → API call 1 starts (takes 2 seconds)
// t1: User types "jo" → API call 2 starts (takes 1 second)
// t2: User types "john" → API call 3 starts (takes 0.5 seconds)
//
// Results arrive:
// t2.5: API call 3 completes → Shows results for "john" ✅
// t3: API call 2 completes → Shows results for "jo" ❌ WRONG!
// t4: API call 1 completes → Shows results for "j" ❌ WRONG!
The Solution With switchMap:
// User types: "j" → "jo" → "john"
// Timeline:
// t0: User types "j" → API call 1 starts
// t1: User types "jo" → API call 1 CANCELLED, API call 2 starts
// t2: User types "john" → API call 2 CANCELLED, API call 3 starts
//
// Results arrive:
// t2.5: API call 3 completes → Shows results for "john" ✅ CORRECT!
// (API calls 1 & 2 were cancelled, so no conflicting results)
3. Visual Timeline Comparison
// Input stream: j----jo----john----|
// Without switchMap:
// API calls: call1-call2-call3
// Results: ----res3--res2-res1 ❌ Wrong order!
// With switchMap:
// API calls: call1 X call2 X call3
// Results: ----------res3------- ✅ Only latest!
4. Alternative Operators Comparison
// mergeMap - Keeps all requests (NOT good for search)
this.searchTerms$.pipe(
mergeMap(term => this.userService.searchUsers(term))
); // All requests complete, results arrive in random order ❌
// concatMap - Waits for each request (TOO SLOW for search)
this.searchTerms$.pipe(
concatMap(term => this.userService.searchUsers(term))
); // Queues requests, very slow user experience ❌
// exhaustMap - Ignores new requests while one is active (NOT good for search)
this.searchTerms$.pipe(
exhaustMap(term => this.userService.searchUsers(term))
); // Ignores fast typing, misses search terms ❌
// switchMap - Perfect for search! ✅
this.searchTerms$.pipe(
switchMap(term => this.userService.searchUsers(term))
); // Cancels previous, only shows latest results ✅
For search functionality, switchMap is the perfect choice because users only care about results for their current search term, not previous ones! 🚀
2. BehaviorSubject for State Management
// subject.service.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class StateService {
// Private BehaviorSubject
private toggle = new BehaviorSubject<boolean>(false);
private currentUser = new BehaviorSubject<User | null>(null);
private theme = new BehaviorSubject<string>('light');
// Public Observable (read-only)
toggle$ = this.toggle.asObservable();
currentUser$ = this.currentUser.asObservable();
theme$ = this.theme.asObservable();
// Methods to update state
onChangeToggle(value: boolean) {
this.toggle.next(value);
}
setCurrentUser(user: User | null) {
this.currentUser.next(user);
}
setTheme(theme: string) {
this.theme.next(theme);
}
// Synchronous access to current values
getCurrentToggleValue(): boolean {
return this.toggle.getValue();
}
getCurrentUser(): User | null {
return this.currentUser.getValue();
}
isUserLoggedIn(): boolean {
return this.currentUser.getValue() !== null;
}
}
3. Component Using BehaviorSubject State
// app.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { StateService } from './state.service';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
@Component({
selector: 'app-root',
template: `
<div [class]="theme">
<button (click)="toggleSidebar()">
{{ (stateService.toggle$ | async) ? 'Hide' : 'Show' }} Sidebar
</button>
<div *ngIf="currentUser$ | async as user; else loginTemplate">
Welcome, {{ user.name }}!
<button (click)="logout()">Logout</button>
</div>
<ng-template #loginTemplate>
<button (click)="login()">Login</button>
</ng-template>
<button (click)="switchTheme()">Switch Theme</button>
</div>
`
})
export class AppComponent implements OnInit, OnDestroy {
currentUser$ = this.stateService.currentUser$;
theme = 'light';
private destroy$ = new Subject<void>();
constructor(public stateService: StateService) {}
ngOnInit() {
// Subscribe to theme changes
this.stateService.theme$.pipe(
takeUntil(this.destroy$)
).subscribe(theme => {
this.theme = theme;
});
}
toggleSidebar() {
const currentValue = this.stateService.getCurrentToggleValue();
this.stateService.onChangeToggle(!currentValue);
}
login() {
const user = { id: 1, name: 'John Doe', email: 'john@example.com' };
this.stateService.setCurrentUser(user);
}
logout() {
this.stateService.setCurrentUser(null);
}
switchTheme() {
const currentTheme = this.theme;
const newTheme = currentTheme === 'light' ? 'dark' : 'light';
this.stateService.setTheme(newTheme);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
🔄 Advanced Subject Types
ReplaySubject - Remembers Multiple Values
import { ReplaySubject } from 'rxjs';
// Remember last 3 values for new subscribers
const replaySubject$ = new ReplaySubject<string>(3);
replaySubject$.next('Message 1');
replaySubject$.next('Message 2');
replaySubject$.next('Message 3');
replaySubject$.next('Message 4');
// New subscriber gets last 3 values
replaySubject$.subscribe(value => console.log('New subscriber:', value));
// Output:
// New subscriber: Message 2
// New subscriber: Message 3
// New subscriber: Message 4
AsyncSubject - Only Last Value When Complete
import { AsyncSubject } from 'rxjs';
const asyncSubject$ = new AsyncSubject<string>();
asyncSubject$.subscribe(value => console.log('Received:', value));
asyncSubject$.next('Value 1');
asyncSubject$.next('Value 2');
asyncSubject$.next('Final Value');
asyncSubject$.complete(); // Only now subscriber gets 'Final Value'
🎯 When to Use Which Subject
Use Subject when:
// ✅ Event notifications (clicks, form submissions)
const buttonClicks$ = new Subject<MouseEvent>();
// ✅ One-time events
const apiCalls$ = new Subject<ApiCall>();
// ✅ Commands/actions
const userActions$ = new Subject<UserAction>();
// Example: Event bus
@Injectable()
export class EventBusService {
private events$ = new Subject<AppEvent>();
emit(event: AppEvent) {
this.events$.next(event);
}
on(eventType: string) {
return this.events$.pipe(
filter(event => event.type === eventType)
);
}
}
Use BehaviorSubject when:
// ✅ Application state
const appState$ = new BehaviorSubject(initialState);
// ✅ User preferences
const theme$ = new BehaviorSubject('light');
// ✅ Current user
const currentUser$ = new BehaviorSubject<User | null>(null);
// ✅ Shopping cart
const cart$ = new BehaviorSubject<CartItem[]>([]);
// ✅ Form state
const formState$ = new BehaviorSubject({ valid: false, values: {} });
Use ReplaySubject when:
// ✅ Chat messages (show last N messages to new users)
const chatMessages$ = new ReplaySubject<Message>(50);
// ✅ Notification history
const notifications$ = new ReplaySubject<Notification>(10);
// ✅ Activity logs
const activityLog$ = new ReplaySubject<Activity>(100);
🔑 Best Practices & Patterns
1. Service Pattern with BehaviorSubject
@Injectable()
export class UserService {
private currentUser$ = new BehaviorSubject<User | null>(null);
private loading$ = new BehaviorSubject<boolean>(false);
// Expose as read-only Observables
user$ = this.currentUser$.asObservable();
loading$ = this.loading$.asObservable();
async login(credentials: LoginCredentials) {
this.loading$.next(true);
try {
const user = await this.authApi.login(credentials);
this.currentUser$.next(user);
} catch (error) {
this.currentUser$.next(null);
throw error;
} finally {
this.loading$.next(false);
}
}
logout() {
this.currentUser$.next(null);
}
// Synchronous access when needed
getCurrentUser(): User | null {
return this.currentUser$.getValue();
}
}
2. Form State Management
@Component({})
export class FormComponent {
private formState$ = new BehaviorSubject({
values: {},
errors: {},
valid: false,
dirty: false
});
state$ = this.formState$.asObservable();
updateField(field: string, value: any) {
const currentState = this.formState$.getValue();
const newState = {
...currentState,
values: { ...currentState.values, [field]: value },
dirty: true
};
// Validate
newState.errors = this.validateForm(newState.values);
newState.valid = Object.keys(newState.errors).length === 0;
this.formState$.next(newState);
}
}
3. Memory Management
@Component({})
export class MyComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
// Use takeUntil for automatic unsubscription
this.dataService.data$.pipe(
takeUntil(this.destroy$)
).subscribe(data => {
// Handle data
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
🔑 Key Takeaways
Observable Pattern:
- Observable: Data producer (like a TV station)
- Observer: Data consumer (like a TV viewer)
- Subscription: Connection between them (like tuning in)
Observable vs Promise:
- Observable: Multiple values, lazy, cancellable, rich operators
- Promise: Single value, eager, not cancellable, limited operators
Subject Types:
- Subject: Basic multicast, no initial value, late subscribers miss data
- BehaviorSubject: Has current value, late subscribers get current value
- ReplaySubject: Remembers N previous values for new subscribers
- AsyncSubject: Only emits last value when completed
Best Practices:
// ✅ Use $ suffix for Observables
const data$ = new Observable();
const state$ = new BehaviorSubject(initialState);
// ✅ Expose BehaviorSubjects as read-only Observables
private _state$ = new BehaviorSubject(initial);
state$ = this._state$.asObservable();
// ✅ Always clean up subscriptions
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
// ✅ Use operators for transformation
data$.pipe(
map(transform),
filter(condition),
catchError(handleError)
);
RxJS Subjects provide powerful tools for reactive state management and event handling in Angular applications! 🚀