Skip to main content

RxJs Subject Complete Guide: Observable, Observer, and Subject Types

📡 Observable, Observer & Observer Pattern in RxJS

Observable - The Data Producer

An Observable is a data producer that can emit multiple values over time. Think of it as a stream or pipe through which data flows.

import { Observable } from 'rxjs';

// Observable is like a recipe - it defines what will happen
const dataStream$ = new Observable(observer => {
observer.next('First value'); // Emit value
observer.next('Second value'); // Emit another value
observer.complete(); // Signal completion
});

// Nothing happens until someone subscribes!
console.log('Observable created, but no data emitted yet');

Observer - The Data Consumer

An Observer is an object that knows how to handle the values emitted by an Observable. It has three methods:

const observer = {
next: value => console.log('Received:', value), // Handle emitted values
error: error => console.error('Error occurred:', error), // Handle errors
complete: () => console.log('Stream completed') // Handle completion
};

// Now the Observable starts emitting
dataStream$.subscribe(observer);
// Output:
// Received: First value
// Received: Second value
// Stream completed

Observer Pattern Explained

The Observer Pattern is a design pattern where:

  • Subject (Observable) maintains a list of observers
  • Observers register to receive notifications
  • Subject notifies all observers when something happens
// Observable acts as the Subject
// Subscribers act as Observers
const subject$ = new Observable(observer => {
let count = 0;
const interval = setInterval(() => {
observer.next(count++);
if (count > 3) {
observer.complete();
clearInterval(interval);
}
}, 1000);
});

// Multiple observers can subscribe
subject$.subscribe(value => console.log('Observer 1:', value));
subject$.subscribe(value => console.log('Observer 2:', value));

🆚 Observable vs Promise: Key Differences

1. Single vs Multiple Values

// Promise - Single value
const promise = fetch('/api/user')
.then(response => response.json());

promise.then(user => console.log(user)); // Gets one user object

// Observable - Multiple values over time
const observable$ = new Observable(observer => {
observer.next('Value 1');
observer.next('Value 2');
observer.next('Value 3');
observer.complete();
});

observable$.subscribe(value => console.log(value));
// Output: Value 1, Value 2, Value 3

2. Eager vs Lazy Execution

// Promise - Eager (executes immediately)
console.log('Creating promise...');
const promise = new Promise(resolve => {
console.log('Promise executor running!'); // Runs immediately
resolve('Done');
});
console.log('Promise created');
// Output: Creating promise... → Promise executor running! → Promise created

// Observable - Lazy (executes only when subscribed)
console.log('Creating observable...');
const observable$ = new Observable(observer => {
console.log('Observable executor running!'); // Only runs when subscribed
observer.next('Done');
});
console.log('Observable created');
observable$.subscribe(); // NOW the executor runs
// Output: Creating observable... → Observable created → Observable executor running!

3. Cancellation Support

// Promise - Cannot be cancelled
const promise = fetch('/api/slow-endpoint');
// No way to cancel this request

// Observable - Can be cancelled
const request$ = new Observable(observer => {
const controller = new AbortController();

fetch('/api/slow-endpoint', { signal: controller.signal })
.then(response => observer.next(response))
.catch(error => observer.error(error));

// Return cleanup function
return () => controller.abort();
});

const subscription = request$.subscribe();
subscription.unsubscribe(); // Cancels the request!

Comparison Table

FeaturePromiseObservable
ValuesSingleMultiple over time
ExecutionEagerLazy
Cancellation
RetryManualBuilt-in operators
OperatorsLimitedRich ecosystem
Error RecoveryBasicAdvanced
Time-based✅ (debounce, throttle)

🎯 Subject vs BehaviorSubject: Main Differences

Subject - Basic Multicast Observable

import { Subject } from 'rxjs';

const subject$ = new Subject();

// Subscribe first observer
subject$.subscribe(value => console.log('Observer 1:', value));

subject$.next('Message 1'); // Observer 1: Message 1

// Subscribe second observer (missed Message 1)
subject$.subscribe(value => console.log('Observer 2:', value));

subject$.next('Message 2');
// Observer 1: Message 2
// Observer 2: Message 2

subject$.next('Message 3');
// Observer 1: Message 3
// Observer 2: Message 3

Key Characteristics of Subject:

  • No initial value
  • No current value storage
  • Late subscribers miss previous emissions
  • Multicasts to multiple observers

BehaviorSubject - Subject with Current Value

import { BehaviorSubject } from 'rxjs';

// Requires initial value
const behaviorSubject$ = new BehaviorSubject('Initial Value');

// Subscribe first observer
behaviorSubject$.subscribe(value => console.log('Observer 1:', value));
// Observer 1: Initial Value (immediately receives current value)

behaviorSubject$.next('Message 1');
// Observer 1: Message 1

// Subscribe second observer (gets current value immediately)
behaviorSubject$.subscribe(value => console.log('Observer 2:', value));
// Observer 2: Message 1 (gets the last emitted value)

behaviorSubject$.next('Message 2');
// Observer 1: Message 2
// Observer 2: Message 2

// Access current value directly
console.log('Current value:', behaviorSubject$.getValue()); // Message 2

Key Characteristics of BehaviorSubject:

  • Requires initial value
  • Stores current value
  • Late subscribers get current value immediately
  • Has getValue() method
  • Multicasts to multiple observers

🛠️ Real-World Angular Examples

1. Debounced Input Search with Subject

How to do logical operations after user stops typing in input field:

// 1. Service Method Returns Observable<User[]>
@Injectable()
export class UserService {
searchUsers(searchTerm: string): Observable<User[]> {
// ^^^^^^^^^ This is an array of User objects
return this.http.get<User[]>(`/api/users/search?q=${searchTerm}`);
}
}

/*
// API Response (what the server sends)
{
"users": [
{ "id": 1, "name": "John Doe", "email": "john@example.com" },
{ "id": 2, "name": "Jane Smith", "email": "jane@example.com" }
]
}
*/
// 2. Component Receives User[] as 'results'
import { OnInit, Component } from '@angular/core';
import { Subject } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

@Component({
selector: 'search-component',
template: `
<input
[ngModel]='inputTyped'
(ngModelChange)='onInputChanged($event)'
placeholder="Search users..."
/>
<div *ngFor="let result of searchResults">{{result.name}}</div>
`
})
export class SearchComponent implements OnInit {
inputTyped: string;
searchResults: any[] = [];

// Subject for handling input changes
private inputChanged: Subject<string> = new Subject<string>();

constructor(private userService: UserService) {}

ngOnInit() {
this.inputChanged.pipe(
debounceTime(500), // Wait 500ms after user stops typing
distinctUntilChanged(), // Only search if search term changed
switchMap(searchTerm =>
this.userService.searchUsers(searchTerm) // Returns Observable<User[]>
)
).subscribe(results => {
// ^^^^^^^ This 'results' is the User[] array
this.searchResults = results; // Assigning User[] to component property
});
}

onInputChanged(text: string) {
this.inputTyped = text;
this.inputChanged.next(text); // Emit the search term
}

ngOnDestroy() {
this.inputChanged.complete(); // Clean up
}
}

🔍 Why switchMap is Used in Search Implementation

The switchMap operator is crucial in the search implementation for several important reasons:

1. Cancellation of Previous Requests

// Without switchMap - PROBLEMATIC
this.inputChanged.pipe(
debounceTime(500),
distinctUntilChanged(),
map(searchTerm => this.userService.searchUsers(searchTerm)) // ❌ Returns Observable
).subscribe(observable => {
// You get an Observable, not the actual results!
observable.subscribe(results => this.searchResults = results);
});
// With switchMap - CORRECT
this.inputChanged.pipe(
debounceTime(500),
distinctUntilChanged(),
switchMap(searchTerm => this.userService.searchUsers(searchTerm)) // ✅ Flattens Observable
).subscribe(results => {
this.searchResults = results; // You get actual results
});

2. Race Condition Prevention

The Problem Without switchMap:

// User types: "j" → "jo" → "john"
// Timeline:
// t0: User types "j" → API call 1 starts (takes 2 seconds)
// t1: User types "jo" → API call 2 starts (takes 1 second)
// t2: User types "john" → API call 3 starts (takes 0.5 seconds)
//
// Results arrive:
// t2.5: API call 3 completes → Shows results for "john" ✅
// t3: API call 2 completes → Shows results for "jo" ❌ WRONG!
// t4: API call 1 completes → Shows results for "j" ❌ WRONG!

The Solution With switchMap:

// User types: "j" → "jo" → "john" 
// Timeline:
// t0: User types "j" → API call 1 starts
// t1: User types "jo" → API call 1 CANCELLED, API call 2 starts
// t2: User types "john" → API call 2 CANCELLED, API call 3 starts
//
// Results arrive:
// t2.5: API call 3 completes → Shows results for "john" ✅ CORRECT!
// (API calls 1 & 2 were cancelled, so no conflicting results)

3. Visual Timeline Comparison

// Input stream:     j----jo----john----|
// Without switchMap:
// API calls: call1-call2-call3
// Results: ----res3--res2-res1 ❌ Wrong order!

// With switchMap:
// API calls: call1 X call2 X call3
// Results: ----------res3------- ✅ Only latest!

4. Alternative Operators Comparison

// mergeMap - Keeps all requests (NOT good for search)
this.searchTerms$.pipe(
mergeMap(term => this.userService.searchUsers(term))
); // All requests complete, results arrive in random order ❌

// concatMap - Waits for each request (TOO SLOW for search)
this.searchTerms$.pipe(
concatMap(term => this.userService.searchUsers(term))
); // Queues requests, very slow user experience ❌

// exhaustMap - Ignores new requests while one is active (NOT good for search)
this.searchTerms$.pipe(
exhaustMap(term => this.userService.searchUsers(term))
); // Ignores fast typing, misses search terms ❌

// switchMap - Perfect for search! ✅
this.searchTerms$.pipe(
switchMap(term => this.userService.searchUsers(term))
); // Cancels previous, only shows latest results ✅

For search functionality, switchMap is the perfect choice because users only care about results for their current search term, not previous ones! 🚀


2. BehaviorSubject for State Management

// subject.service.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';

@Injectable({
providedIn: 'root'
})
export class StateService {
// Private BehaviorSubject
private toggle = new BehaviorSubject<boolean>(false);
private currentUser = new BehaviorSubject<User | null>(null);
private theme = new BehaviorSubject<string>('light');

// Public Observable (read-only)
toggle$ = this.toggle.asObservable();
currentUser$ = this.currentUser.asObservable();
theme$ = this.theme.asObservable();

// Methods to update state
onChangeToggle(value: boolean) {
this.toggle.next(value);
}

setCurrentUser(user: User | null) {
this.currentUser.next(user);
}

setTheme(theme: string) {
this.theme.next(theme);
}

// Synchronous access to current values
getCurrentToggleValue(): boolean {
return this.toggle.getValue();
}

getCurrentUser(): User | null {
return this.currentUser.getValue();
}

isUserLoggedIn(): boolean {
return this.currentUser.getValue() !== null;
}
}

3. Component Using BehaviorSubject State

// app.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { StateService } from './state.service';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
selector: 'app-root',
template: `
<div [class]="theme">
<button (click)="toggleSidebar()">
{{ (stateService.toggle$ | async) ? 'Hide' : 'Show' }} Sidebar
</button>

<div *ngIf="currentUser$ | async as user; else loginTemplate">
Welcome, {{ user.name }}!
<button (click)="logout()">Logout</button>
</div>

<ng-template #loginTemplate>
<button (click)="login()">Login</button>
</ng-template>

<button (click)="switchTheme()">Switch Theme</button>
</div>
`
})
export class AppComponent implements OnInit, OnDestroy {
currentUser$ = this.stateService.currentUser$;
theme = 'light';

private destroy$ = new Subject<void>();

constructor(public stateService: StateService) {}

ngOnInit() {
// Subscribe to theme changes
this.stateService.theme$.pipe(
takeUntil(this.destroy$)
).subscribe(theme => {
this.theme = theme;
});
}

toggleSidebar() {
const currentValue = this.stateService.getCurrentToggleValue();
this.stateService.onChangeToggle(!currentValue);
}

login() {
const user = { id: 1, name: 'John Doe', email: 'john@example.com' };
this.stateService.setCurrentUser(user);
}

logout() {
this.stateService.setCurrentUser(null);
}

switchTheme() {
const currentTheme = this.theme;
const newTheme = currentTheme === 'light' ? 'dark' : 'light';
this.stateService.setTheme(newTheme);
}

ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}

🔄 Advanced Subject Types

ReplaySubject - Remembers Multiple Values

import { ReplaySubject } from 'rxjs';

// Remember last 3 values for new subscribers
const replaySubject$ = new ReplaySubject<string>(3);

replaySubject$.next('Message 1');
replaySubject$.next('Message 2');
replaySubject$.next('Message 3');
replaySubject$.next('Message 4');

// New subscriber gets last 3 values
replaySubject$.subscribe(value => console.log('New subscriber:', value));
// Output:
// New subscriber: Message 2
// New subscriber: Message 3
// New subscriber: Message 4

AsyncSubject - Only Last Value When Complete

import { AsyncSubject } from 'rxjs';

const asyncSubject$ = new AsyncSubject<string>();

asyncSubject$.subscribe(value => console.log('Received:', value));

asyncSubject$.next('Value 1');
asyncSubject$.next('Value 2');
asyncSubject$.next('Final Value');
asyncSubject$.complete(); // Only now subscriber gets 'Final Value'

🎯 When to Use Which Subject

Use Subject when:

// ✅ Event notifications (clicks, form submissions)
const buttonClicks$ = new Subject<MouseEvent>();

// ✅ One-time events
const apiCalls$ = new Subject<ApiCall>();

// ✅ Commands/actions
const userActions$ = new Subject<UserAction>();

// Example: Event bus
@Injectable()
export class EventBusService {
private events$ = new Subject<AppEvent>();

emit(event: AppEvent) {
this.events$.next(event);
}

on(eventType: string) {
return this.events$.pipe(
filter(event => event.type === eventType)
);
}
}

Use BehaviorSubject when:

// ✅ Application state
const appState$ = new BehaviorSubject(initialState);

// ✅ User preferences
const theme$ = new BehaviorSubject('light');

// ✅ Current user
const currentUser$ = new BehaviorSubject<User | null>(null);

// ✅ Shopping cart
const cart$ = new BehaviorSubject<CartItem[]>([]);

// ✅ Form state
const formState$ = new BehaviorSubject({ valid: false, values: {} });

Use ReplaySubject when:

// ✅ Chat messages (show last N messages to new users)
const chatMessages$ = new ReplaySubject<Message>(50);

// ✅ Notification history
const notifications$ = new ReplaySubject<Notification>(10);

// ✅ Activity logs
const activityLog$ = new ReplaySubject<Activity>(100);

🔑 Best Practices & Patterns

1. Service Pattern with BehaviorSubject

@Injectable()
export class UserService {
private currentUser$ = new BehaviorSubject<User | null>(null);
private loading$ = new BehaviorSubject<boolean>(false);

// Expose as read-only Observables
user$ = this.currentUser$.asObservable();
loading$ = this.loading$.asObservable();

async login(credentials: LoginCredentials) {
this.loading$.next(true);
try {
const user = await this.authApi.login(credentials);
this.currentUser$.next(user);
} catch (error) {
this.currentUser$.next(null);
throw error;
} finally {
this.loading$.next(false);
}
}

logout() {
this.currentUser$.next(null);
}

// Synchronous access when needed
getCurrentUser(): User | null {
return this.currentUser$.getValue();
}
}

2. Form State Management

@Component({})
export class FormComponent {
private formState$ = new BehaviorSubject({
values: {},
errors: {},
valid: false,
dirty: false
});

state$ = this.formState$.asObservable();

updateField(field: string, value: any) {
const currentState = this.formState$.getValue();
const newState = {
...currentState,
values: { ...currentState.values, [field]: value },
dirty: true
};

// Validate
newState.errors = this.validateForm(newState.values);
newState.valid = Object.keys(newState.errors).length === 0;

this.formState$.next(newState);
}
}

3. Memory Management

@Component({})
export class MyComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();

ngOnInit() {
// Use takeUntil for automatic unsubscription
this.dataService.data$.pipe(
takeUntil(this.destroy$)
).subscribe(data => {
// Handle data
});
}

ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}

🔑 Key Takeaways

Observable Pattern:

  • Observable: Data producer (like a TV station)
  • Observer: Data consumer (like a TV viewer)
  • Subscription: Connection between them (like tuning in)

Observable vs Promise:

  • Observable: Multiple values, lazy, cancellable, rich operators
  • Promise: Single value, eager, not cancellable, limited operators

Subject Types:

  • Subject: Basic multicast, no initial value, late subscribers miss data
  • BehaviorSubject: Has current value, late subscribers get current value
  • ReplaySubject: Remembers N previous values for new subscribers
  • AsyncSubject: Only emits last value when completed

Best Practices:

// ✅ Use $ suffix for Observables
const data$ = new Observable();
const state$ = new BehaviorSubject(initialState);

// ✅ Expose BehaviorSubjects as read-only Observables
private _state$ = new BehaviorSubject(initial);
state$ = this._state$.asObservable();

// ✅ Always clean up subscriptions
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}

// ✅ Use operators for transformation
data$.pipe(
map(transform),
filter(condition),
catchError(handleError)
);

RxJS Subjects provide powerful tools for reactive state management and event handling in Angular applications! 🚀