Перейти к содержимому

Promise pool (concurrency limit)

Promise pool — ограничение числа одновременных async-задач: batch API-запросы, обработка файлов, скрейпинг. Интервьюер проверяет: не создавать все промисы сразу (промисы eager!), запускать следующий как только завершается предыдущий, сохранять порядок результатов.

Worker-корутины: каждый воркер тянет следующую задачу из общей очереди через closure:

/**
* Выполняет mapper для каждого элемента inputs с лимитом одновременных задач.
* Аналог p-map / Promise.map(arr, fn, { concurrency }).
*
* @template T, R
* @param {T[]} inputs — входные данные
* @param {(item: T) => Promise<R>} mapper — async-функция
* @param {number} concurrency — макс. одновременных задач
* @returns {Promise<R[]>} результаты в порядке inputs
*/
async function promisePool(inputs, mapper, concurrency) {
const results = new Array(inputs.length);
let nextIndex = 0; // shared index — следующий необработанный элемент
async function worker() {
while (true) {
const i = nextIndex++; // захватываем индекс (JS однопоточен — безопасно)
if (i >= inputs.length) break; // очередь пуста
results[i] = await mapper(inputs[i]); // ждём и записываем по индексу
}
}
// Запускаем min(concurrency, n) воркеров одновременно
await Promise.all(
Array.from({ length: Math.min(concurrency, inputs.length) }, worker)
);
return results;
}
// usage: не более 3 параллельных запросов
const urls = ['/api/1', '/api/2', '/api/3', '/api/4', '/api/5'];
const pages = await promisePool(
urls,
url => fetch(url).then(r => r.json()),
3,
);
// pages[i] соответствует urls[i] ✓
// В любой момент выполняется <= 3 запроса

Версия p-limit: функция-декоратор для динамического добавления задач:

/**
* Создаёт функцию-ограничитель с лимитом concurrency.
* Аналог npm-пакета p-limit.
*
* @param {number} concurrency
* @returns {<T>(fn: () => Promise<T>) => Promise<T>}
*/
function pLimit(concurrency) {
let active = 0;
const queue = []; // { fn, resolve, reject }[]
function next() {
if (active >= concurrency || queue.length === 0) return;
active++;
const { fn, resolve, reject } = queue.shift();
fn()
.then(resolve, reject)
.finally(() => { active--; next(); }); // запустить следующую из очереди
}
return function limit(fn) {
return new Promise((resolve, reject) => {
queue.push({ fn, resolve, reject });
next();
});
};
}
// usage
const limit = pLimit(2); // максимум 2 одновременно
const tasks = [1, 2, 3, 4, 5].map(i =>
limit(() => new Promise(res => setTimeout(() => res(i * 10), 100)))
);
const results = await Promise.all(tasks);
console.log(results); // [10, 20, 30, 40, 50]
// Задачи 1 и 2 стартуют сразу; 3 — когда завершится 1; 4 — когда 2; и т.д.
// Разница promisePool vs pLimit:
// promisePool → обрабатывает готовый массив, простой API
// pLimit → декоратор, задачи добавляются динамически, более гибкий

Сложность

  • Время: O(n/c · t) где n = задачи, c = concurrency, t = среднее время задачи
  • Память: O(n) для results + O(c) активных задач + O(n) очередь

Итог: promisePool: c воркеров тянут задачи через nextIndex++. pLimit: очередь с active-счётчиком и автозапуском следующей задачи через finally.