Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last transform of pipeline hangs #11

Open
br4in1 opened this issue Jul 27, 2021 · 0 comments
Open

Last transform of pipeline hangs #11

br4in1 opened this issue Jul 27, 2021 · 0 comments

Comments

@br4in1
Copy link

br4in1 commented Jul 27, 2021

Hello there,

I'm working on a project where I have a pipeline (one readable stream as the source and multiple Transforms).
In normal circumstances, everything works fine. But it is when a huge amount of data enters the pipeline in rapid succession that things get strange (the problem appears with at least 4000 chunks but I guess that's irrelevant):
The pipeline hangs after processing a small portion of the data, and this continues forever. It doesn't even resume.

By adding log and investigating the problem, I came to the conclusion that it is always the last Transform that hangs. And to be sure, I deleted the last Transform (so now, the n-1 Transform is the last one in the pipeline) and it is, then, the new last Transform that hangs.

I thought that it is a memory problem and so I tried some workarounds:

  • I increased the highWaterMark threshold, but this didn't completely solve the issue, though it delayed it (the pipeline processes more data but still hangs at some point).
  • I increased/decreased the parallelism option for each Transform and this didn't solve it either.

At that point, I decided to simplify my pipeline to get to some sort of a skeleton model. It turned out that the ordered option is causing the problem when set to false.

Valid workaround

I thought that it is maybe one of the Transforms is being corked at some point. So I added a data event listener for each one to uncork manually if this is the case. It worked. But then I tried, for no valid reason, to leave the callback of the listener empty (not trying to uncork) and it still works.
So, long story short : adding an empty listener on the Transform fixes the hanging problem (code chunk below)

return transform(myParallelism, { ordered: false }, async (data: Input, callback: TransformCallback) => {
     try {
            const result = await someAsyncOperation(data)
            callback(null, result)
     } catch (error) {
            callback(error)
     }
}).on('data', () => {
     
})

I'm not really sure, though, if the real problem comes for this library or from the implementation of node streams.
Also I don't have a minimal code to show the problem, but I can work on one if deemed necessary.

Thank you so much 😁

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant