Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(error): Fix error passthrough in queued tasks #145

Merged
merged 3 commits into from
Dec 9, 2023

Conversation

az0uz
Copy link
Contributor

@az0uz az0uz commented Jul 19, 2023

fixes #140

Errors being thrown by chunkGetItem (and underlying storage requests) are uncaught.

The use of the PQueue class will both emit an error event and throw if awaiting the Promise returned by the queue.add method.

To prevent this issue, I created an array of queue.add promises, and await for all those promises instead of using queue.onIdle().

Using the error event will still be considered as uncaught errors since nothing is awaiting the queue.add promises.

@az0uz
Copy link
Contributor Author

az0uz commented Oct 22, 2023

Right now, when there is a network issue in a chunk request, the get will still complete and return data, with that errored chunk data containing zeros, this change will fix that, and correctly throw on that method if any of the chunk requests errored. This will also throw the underlying AbortError when passing an AbortSignal to the Store through StoreOptions.

@toloudis
Copy link
Contributor

I can confirm this is fixing a bug we have in our attempt to write a proxy Store to cache chunk requests.

Comment on lines 351 to 356
allTasks.push((async () => {
await queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions));
progress += 1;
progressCallback({ progress: progress, queueSize: queueSize });
})();
})());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, could we just use the queue but remove the async IIFE?

queue.add(async () => {
  await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions);
  progress += 1;
  progressCallback({ progress: progress, queueSize: queueSize });
});

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then keep queue.onIdle?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do this and follow the pattern in the rest of the changes, but it brought back the same problem in my use case (trying to cancel chunk requests). So maybe it's something about how PQueue.onIdle works...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the last part of this discussion sindresorhus/p-queue#26 can inform the use of onIdle here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked into the implementation of p-queue while implementing this PR and it seems like the only way to correctly throw on the task error is to await the return of the queue.add method, see here
awaiting the queue.onIdle won't throw on error. So it seemed to me you have to create an Array of queue.add return promises and use await Promise.all on this Array.

Copy link
Collaborator

@manzt manzt Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! Thank you for looking into it and for your patience in my response. Would you mind still wrapping the full task in the queue.add and add and "await" to the progressCallback?

  const task = queue.add(async () => {
    await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions);
    progress += 1;
    await progressCallback({ progress: progress, queueSize: queueSize });
  });

  allTasks.push(task);


// ... 

await promise.all(allTasks);

This way if progressCallback throws we will also catch the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I've update the code to remove the iife and move the progress callback inside the queued task.

I didn't await for the progressCallback though since it is typed as returning void, I suggest this is done in a different PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Thank you!

Copy link
Collaborator

@manzt manzt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@manzt manzt merged commit 217e178 into gzuidhof:master Dec 9, 2023
3 checks passed
@az0uz az0uz deleted the az0uz/fix_async_error_passthrough branch December 30, 2023 13:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Errors from chunkGetItem are not handled
3 participants