p-queue: Rejecting the whole queue when a task rejects

I’m trying p-queue after using promise-queue for some time. Given this background, I was expecting to see the whole queue failing if any of its tasks rejects. However, this does not seem to happen.

Here’s an example:

// testPQueue.js

import * as PQueue from "p-queue";

async function processQueue() {
  const queue = new PQueue();
  queue.add(
    () =>
      new Promise((resolve, reject) =>
        setTimeout(() => {
          reject();
        }, 500),
      ),
  );

  await queue.onIdle();
  console.log("should not be here");
  return 42;
}

(async () => {
  try {
    const result = await processQueue();
    console.log(result);
  } catch (e) {
    console.log("could not process queue");
  }
})();
node ./testPQueue.js

Instead of seeing could not process queue in my stdout, I get this:

should not be here
42
(node:13679) UnhandledPromiseRejectionWarning: undefined
(node:13679) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:13679) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

How can I stop and reject the whole queue once any its tasks has failed?

About this issue

  • Original URL
  • State: open
  • Created 6 years ago
  • Reactions: 20
  • Comments: 25 (3 by maintainers)

Most upvoted comments

To me it definitively makes sense that queue.add can reject. It’s important to the caller to know which specific call to add has failed or not. This is especially true when adding abstractions on top of queue.add. In my usage I need to serialize writes over a custom native (Node.js Addon) channel. To simplify the native side, I decided to do all serialization in JavaScript and am attempting to use p-queue with a concurrency of 1 for this.

This way my user can safely call my write function (which will call queue.add) in different parts of his app, and be able to know which writes failed.

I also am looking for a behaviour similar to OP and was surprised that I could not get this behaviour with the following:

async function write(data) {
  try {
    return queue.add(() => nativeWrite(data))
  } catch(e) {
    // Stop further processing
    queue.clear();
    // Re-throw
    throw e        
  }
}

With the above if we queue a few items, after the first write throws, one further write will be attempted even if we clear the queue. Basically after we have resolved/rejected the promise from add, the next item is already queued and there is no way to clear it.

I think that pauseOnError makes sense to me. The other thing I can think of is to enable (via config?) p-queue to return full control to the caller before queuing the next item.

Does it make sense that queue.add() can reject? Or should we leave that up to a queue method like queue#onIdle() or queue#onError()? I don’t think we should have to handle the same error in multiple places. So I would personally prefer have one single place to handle failure. Where should it be?

I’ll make a PR for that. New option: pauseOnError

One thing I was hoping to see was onIdle to reject if there was an error. That way I can queue up all my tasks and once they’re all done, I can know if any had an error. Now I have to do something like

let error;
queue.addAll(mytasks).catch(err => error = err);
queue.onIdle().then(() => {
  if (error) throw error;
});
// or with async await
async function test(){
  try {
    await queue.addAll(mytasks);
  } catch (e) {
    await queue.onIdle();
    // here I know the queue is really done I can throw the error
    throw e;
  }
}

Running this script demonstrates how the queue keeps running other tasks once one has failed, and also demonstrates some aspect of Promise.all() error handling that I don’t understand – where did the bar error go?

Promise.all rejects when the first Promise reject, you cannot get the other errors with it.

In general I like the idea of not stopping on the first error, when making a lot of independent requests you want to know if there was an error, but you don’t want the promise to resolve until all the requests are completed.

On the other hand, an onFail() method that resolve (or rejects) on the first error seen in the queue like Promise.all would be nice.

Overall, I think the documentation of the module could use some work for error handling. I had to see this issue to figure out minimal information about error handling

If anybody’s looking for a workaround here’s what I came up with:

(Obviously no ts, it’s too clunky to include here. You can figure out type annotations by yourself, or I don’t know, ping me and I’ll try to help)

class AutoPausingQueue extends PQueue {
  add(fn, options) {
    const fnPausing = () => {
      try {
        const fnResult = fn();
        return !(fnResult instanceof Promise)
          ? fnResult
          : fnResult.catch(err => {
            this.pause();
            throw err;
          });
      } catch(e) {
        this.pause();
        throw e;
      }
    };
    return PQueue.prototype.add.call(this, fnPausing, options);
  }
}

I bumped into this issue as well, where I’d like the whole queue to fail when there is an error. My usecase:

  • iterating through millions of records
  • doing the same processing for each record
  • any exception is bad news, and should stop the whole queue

I really like the p-queue API, up until I realized I’d have to resort to odd workarounds to get this to work.

I understand there are other use cases where we’d like to ignore or handle individual errors, but that seems to be compatible with an pauseOnError option right? I.e., adding a pauseOnError would not break existing implementations. It would still support handling the errors individually. But it would also make the life easier of those where the error should ‘break’ the queue.

I’d expect something like this to work:

const delay = require('delay');
const {default: PQueue} = require('p-queue');


const queue = new PQueue({concurrency: 2, pauseOnError: true});

//Adding catch clause to the `add` function to avoid unhandled rejections
//As I've turned `pauseOnError` on 
queue.add(() => Promise.resolve()).catch(() => {});
queue.add(() => delay(2000)).catch(() => {});
queue.add(() => Promise.reject()).catch(() => {});
queue.add(() => Promise.resolve()).catch(() => {});

queue.onIdle()
  .then(() => {
     //We throw an error in the 3rd promise. We should not end up here
   })
  .catch((e) => {
    //I'd expect the first rejected promise to end up in this catch clause
  })

Given this background, I was expecting to see the whole queue failing if any of its tasks rejects. However, this does not seem to happen.

promise-queue also rejects in the queue.add().

How would you imagine the whole queue should be failing? I don’t think the PQueue class should throw. We could maybe add a onFail method you could subscribe to. Happy to consider ideas.