Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await all work currently in the queue #65

Open
ottokruse opened this issue Mar 15, 2023 · 5 comments
Open

Await all work currently in the queue #65

ottokruse opened this issue Mar 15, 2023 · 5 comments

Comments

@ottokruse
Copy link

ottokruse commented Mar 15, 2023

I'd like to be able to await all work in the queue. (I use fastq to limit the amount of parallel work being done and create a new ephemeral queue every time I need to do this work)

Currently I'm doing this, which I don't like:

const worker = (obj) => new Promise(resolve => setTimeout(resolve, 2000)).then(() => console.log(obj))
const q = fastq.promise(worker, 10);
const promises = []
for await (const obj of asyncIteratorThatYieldsALOT) {
    promises.push(q.push(obj));
}
await Promise.all(promises);
console.log("Nr of obj processed:", promises.length)

Is there a better way?

Otherwise, I'd be open to contribute this feature?

@mcollina
Copy link
Owner

This would a be a good feature to have. Adding it to the promise implementation would be nice, i.e. q.pushAll(asyncIterator).

Note that your implementation accumulates a lot of data in the queue, while I would do something like:

let p
for await (const obj of asyncIteratorThatYieldsALOT) {
  p = q.push(obj)
  p.catch(noop)
  if (q.length() > 0) {
    await p // implement backpressure
  }
}
await p // wait for the last task to finish

Would you like to send a PR?

@ottokruse
Copy link
Author

I found out about q.drained() which does what I want because I can await it to understand when all work is done.

Changes my example to:

const worker = (obj) => new Promise(resolve => setTimeout(resolve, 2000)).then(() => console.log(obj))
const q = fastq.promise(worker, 10);
let count = 0;
for await (const obj of asyncIteratorThatYieldsALOT) {
    q.push(obj).catch(e => { throw e }); // note I want the error to bubble up
    count++;
}
await q.drained();
console.log("Nr of obj processed:", count)

Still thinking about your back pressure comment.

@ottokruse
Copy link
Author

Experimented some more, couple of observations:

  • Awaiting q.drained() works, but this will never throw (which is by design).
  • If a task throws, this exception is eaten, unless you do as I do above q.push(obj).catch(e => { throw e }) but in that case the error just bubbles up to the NodeJS process which exits then. I think I'd like those errors to bubble up to the await q.drained() call. (Similar to how the first rejecting promise has it's error bubble up to an await Promise.all([...])). But for my use case at hand now, having NodeJS exit is actually fine.
  • The backpressure code example you gave above works for me (thanks), but I think we can make it more optimal by not awaiting the last task we pushed, but rather await the first running task that completes--which makes room again for running a new task.

For that last point, would it be nice to have a function like this?:

await q.unsaturated();

The return promise would resolve:

  • immediately if there's capacity to run the task immediately (i.e. nr of running tasks < concurrency)
  • otherwise as soon as capacity to run the task becomes available

Perhaps it should take an argument whereby you can indicate the amount of items on the queue you're willing to accept (i.e. like a highWaterMark):

await q.unsaturated(25);

Or perhaps such a highWaterMark could be a constructor argument to the promise based queue.

--

But long story short: my issue is resolved, I can await all work in the queue. We could close this issue.
Thank you for this library.

@mcollina
Copy link
Owner

I think a PR to implement some "back pressure streaming" support for the queue would be amazing.

@mcollina mcollina reopened this Mar 17, 2023
@ottokruse
Copy link
Author

I'll try to have a stab at that!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants