Перейти к содержимому

Mini Observable

Observable-паттерн проверяет понимание lazy execution, push-потоков данных и разницы с Promise. Observable «холодный» — выполнение начинается при subscribe и останавливается при unsubscribe. Интервьюер ждёт: subscribe/unsubscribe, операторы map/filter, понимание разницы с EventEmitter и Promise.

Минимальный Observable с операторами map/filter и фабричными методами:

/**
* Минимальный Observable (подмножество TC39 proposal / RxJS).
* «Холодный»: subscriber-функция запускается заново для каждого subscribe().
*/
class Observable {
/**
* @param {(observer: Observer) => (() => void) | void} subscriberFn
* @typedef {{ next(v: any): void; error(e: any): void; complete(): void }} Observer
*/
constructor(subscriberFn) {
this._sub = subscriberFn;
}
/**
* Подписаться на поток.
* @param {Partial<Observer> | ((v: any) => void)} observer
* @returns {{ unsubscribe(): void }}
*/
subscribe(observer) {
const obs = typeof observer === 'function'
? { next: observer, error: console.error, complete: () => {} }
: { next: () => {}, error: console.error, complete: () => {}, ...observer };
let closed = false;
// Защита от вызовов после unsubscribe
const safe = {
next: v => { if (!closed) obs.next(v); },
error: e => { if (!closed) { closed = true; obs.error(e); } },
complete: () => { if (!closed) { closed = true; obs.complete(); } },
};
const cleanup = this._sub(safe) ?? (() => {});
return {
unsubscribe() { closed = true; cleanup(); },
};
}
/** Оператор map — возвращает новый Observable без подписки. */
map(fn) {
return new Observable(observer => {
const sub = this.subscribe({
next: v => observer.next(fn(v)),
error: e => observer.error(e),
complete: () => observer.complete(),
});
return () => sub.unsubscribe(); // cleanup пробрасывается вверх
});
}
/** Оператор filter. */
filter(pred) {
return new Observable(observer => {
const sub = this.subscribe({
next: v => pred(v) && observer.next(v),
error: e => observer.error(e),
complete: () => observer.complete(),
});
return () => sub.unsubscribe();
});
}
/** Фабрика: Observable из набора значений. */
static of(...values) {
return new Observable(observer => {
for (const v of values) observer.next(v);
observer.complete();
});
}
/** Фабрика: Observable из DOM-события с автоматическим cleanup. */
static fromEvent(target, eventName) {
return new Observable(observer => {
const handler = e => observer.next(e);
target.addEventListener(eventName, handler);
return () => target.removeEventListener(eventName, handler); // ключевой cleanup!
});
}
}

Использование, lazy execution и разница с Promise / EventEmitter:

// ─── Базовое использование ────────────────────────────────────────────────
const nums$ = Observable.of(1, 2, 3, 4, 5);
const even$ = nums$.filter(x => x % 2 === 0).map(x => x * 10);
const sub = even$.subscribe({
next: v => console.log(v), // 20, 40
complete: () => console.log('done'),
});
// sub.unsubscribe() если нужно прервать
// ─── «Холодный» vs «горячий» ─────────────────────────────────────────────
// Каждый subscribe запускает независимый поток (cold!)
const counter$ = new Observable(observer => {
let n = 0;
const id = setInterval(() => observer.next(n++), 100);
return () => clearInterval(id); // без этого — утечка!
});
const sub1 = counter$.subscribe(v => console.log('A:', v)); // независимый счётчик
const sub2 = counter$.subscribe(v => console.log('B:', v)); // независимый счётчик
setTimeout(() => {
sub1.unsubscribe(); // clearInterval вызывается для sub1
sub2.unsubscribe(); // clearInterval вызывается для sub2
}, 350);
// ─── Операторы на практике ────────────────────────────────────────────────
const clicks$ = Observable.fromEvent(document, 'click')
.filter(e => e.target.matches('.btn'))
.map(e => ({ x: e.clientX, y: e.clientY }));
const clickSub = clicks$.subscribe(pos => updateCursor(pos));
setTimeout(() => clickSub.unsubscribe(), 5000); // listener удалён автоматически
// ─── Сравнение подходов ───────────────────────────────────────────────────
// Promise: одно значение, eager (уже выполняется), нет отмены
// EventEmitter: «горячий», пропускает события до подписки, нет cleanup
// Observable: lazy (cold), многократные значения, отмена через unsubscribe

Сложность

  • Время: O(1) subscribe; O(n·k) для n значений и k операторов в цепочке
  • Память: O(k) стек операторов; значения не буферизуются (streaming)

Итог: Observable: lazy push-поток, subscribe запускает выполнение, unsubscribe вызывает cleanup. map/filter возвращают новый Observable. Холодный — независимый поток для каждого подписчика.