2
\$\begingroup\$

Here is an async task queue implementation. The idea is to reuse node.js standard library as much as possible.

const { Readable, Writable } = require('stream');

const delay = (ms) =>
  new Promise((resolve) => {
    setTimeout(resolve, ms)
  })

const task = (message) => {
  console.log(`${message} started`)

  return delay(500).then(() => message + ' solved')
}

class AsyncTaskQueue {
  constructor(limit = 7) {

    this.queue = new Readable({
      objectMode: true,
      read(size) {}
    })
    this.processor = new Writable({
      objectMode: true,
      highWaterMark: limit + 1,
      write(task, encoding, callback) {
        task().finally(() => callback())
      },
      writev(tasks, callback) {
        Promise.all(tasks.map(({chunk}) => chunk().catch(err => err)))
          .finally(() => callback())
      }
    });
    this.queue.pipe(this.processor)
  }

  addTask(execute) {
    const closure = { resolve: null, reject: null }
    const task = () => execute().then(closure.resolve).catch(closure.reject)
    const promise = new Promise((resolve, reject) => {
      closure.resolve = resolve;
      closure.reject = reject;
    })
    this.queue.push(task)
    return promise
  } 
}

const q = new AsyncTaskQueue();

for (let i = 0; i< 100; i++) {
  q.addTask(
    task.bind(null, i)
  ).then(console.log)
\$\endgroup\$

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.