Mini Observable
Observable-паттерн проверяет понимание lazy execution, push-потоков данных и разницы с Promise. Observable «холодный» — выполнение начинается при subscribe и останавливается при unsubscribe. Интервьюер ждёт: subscribe/unsubscribe, операторы map/filter, понимание разницы с EventEmitter и Promise.
Минимальный Observable с операторами map/filter и фабричными методами:
/** * Минимальный Observable (подмножество TC39 proposal / RxJS). * «Холодный»: subscriber-функция запускается заново для каждого subscribe(). */class Observable { /** * @param {(observer: Observer) => (() => void) | void} subscriberFn * @typedef {{ next(v: any): void; error(e: any): void; complete(): void }} Observer */ constructor(subscriberFn) { this._sub = subscriberFn; }
/** * Подписаться на поток. * @param {Partial<Observer> | ((v: any) => void)} observer * @returns {{ unsubscribe(): void }} */ subscribe(observer) { const obs = typeof observer === 'function' ? { next: observer, error: console.error, complete: () => {} } : { next: () => {}, error: console.error, complete: () => {}, ...observer };
let closed = false;
// Защита от вызовов после unsubscribe const safe = { next: v => { if (!closed) obs.next(v); }, error: e => { if (!closed) { closed = true; obs.error(e); } }, complete: () => { if (!closed) { closed = true; obs.complete(); } }, };
const cleanup = this._sub(safe) ?? (() => {});
return { unsubscribe() { closed = true; cleanup(); }, }; }
/** Оператор map — возвращает новый Observable без подписки. */ map(fn) { return new Observable(observer => { const sub = this.subscribe({ next: v => observer.next(fn(v)), error: e => observer.error(e), complete: () => observer.complete(), }); return () => sub.unsubscribe(); // cleanup пробрасывается вверх }); }
/** Оператор filter. */ filter(pred) { return new Observable(observer => { const sub = this.subscribe({ next: v => pred(v) && observer.next(v), error: e => observer.error(e), complete: () => observer.complete(), }); return () => sub.unsubscribe(); }); }
/** Фабрика: Observable из набора значений. */ static of(...values) { return new Observable(observer => { for (const v of values) observer.next(v); observer.complete(); }); }
/** Фабрика: Observable из DOM-события с автоматическим cleanup. */ static fromEvent(target, eventName) { return new Observable(observer => { const handler = e => observer.next(e); target.addEventListener(eventName, handler); return () => target.removeEventListener(eventName, handler); // ключевой cleanup! }); }}Использование, lazy execution и разница с Promise / EventEmitter:
// ─── Базовое использование ────────────────────────────────────────────────const nums$ = Observable.of(1, 2, 3, 4, 5);const even$ = nums$.filter(x => x % 2 === 0).map(x => x * 10);
const sub = even$.subscribe({ next: v => console.log(v), // 20, 40 complete: () => console.log('done'),});// sub.unsubscribe() если нужно прервать
// ─── «Холодный» vs «горячий» ─────────────────────────────────────────────// Каждый subscribe запускает независимый поток (cold!)const counter$ = new Observable(observer => { let n = 0; const id = setInterval(() => observer.next(n++), 100); return () => clearInterval(id); // без этого — утечка!});
const sub1 = counter$.subscribe(v => console.log('A:', v)); // независимый счётчикconst sub2 = counter$.subscribe(v => console.log('B:', v)); // независимый счётчик
setTimeout(() => { sub1.unsubscribe(); // clearInterval вызывается для sub1 sub2.unsubscribe(); // clearInterval вызывается для sub2}, 350);
// ─── Операторы на практике ────────────────────────────────────────────────const clicks$ = Observable.fromEvent(document, 'click') .filter(e => e.target.matches('.btn')) .map(e => ({ x: e.clientX, y: e.clientY }));
const clickSub = clicks$.subscribe(pos => updateCursor(pos));setTimeout(() => clickSub.unsubscribe(), 5000); // listener удалён автоматически
// ─── Сравнение подходов ───────────────────────────────────────────────────// Promise: одно значение, eager (уже выполняется), нет отмены// EventEmitter: «горячий», пропускает события до подписки, нет cleanup// Observable: lazy (cold), многократные значения, отмена через unsubscribeСложность
- Время: O(1) subscribe; O(n·k) для n значений и k операторов в цепочке
- Память: O(k) стек операторов; значения не буферизуются (streaming)
Итог: Observable: lazy push-поток, subscribe запускает выполнение, unsubscribe вызывает cleanup. map/filter возвращают новый Observable. Холодный — независимый поток для каждого подписчика.