Here is an async task queue implementation. The idea is to reuse node.js standard library as much as possible.
const { Readable, Writable } = require('stream');
const delay = (ms) =>
new Promise((resolve) => {
setTimeout(resolve, ms)
})
const task = (message) => {
console.log(`${message} started`)
return delay(500).then(() => message + ' solved')
}
class AsyncTaskQueue {
constructor(limit = 7) {
this.queue = new Readable({
objectMode: true,
read(size) {}
})
this.processor = new Writable({
objectMode: true,
highWaterMark: limit + 1,
write(task, encoding, callback) {
task().finally(() => callback())
},
writev(tasks, callback) {
Promise.all(tasks.map(({chunk}) => chunk().catch(err => err)))
.finally(() => callback())
}
});
this.queue.pipe(this.processor)
}
addTask(execute) {
const closure = { resolve: null, reject: null }
const task = () => execute().then(closure.resolve).catch(closure.reject)
const promise = new Promise((resolve, reject) => {
closure.resolve = resolve;
closure.reject = reject;
})
this.queue.push(task)
return promise
}
}
const q = new AsyncTaskQueue();
for (let i = 0; i< 100; i++) {
q.addTask(
task.bind(null, i)
).then(console.log)