Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[category- help] .on('end', ...) not waiting till process inside .on('data', ...) is completed. #206

Open
The-CodeNinja opened this issue Nov 5, 2021 · 4 comments

Comments

@The-CodeNinja
Copy link

here is the flow of code for what i am doing

const results = [];
fs.createReadStream('MainVerbs.csv')
    .pipe(csv())
    .on('data', async function (row) {
        await promiseReturningFunction(row['v1'])
    .on('end', () => {
        console.log('done'); 
    })

Problem is that, .on('end', ...) end is being fired before the time consuming processing on all rows is done.
I tried using async await in the .on('data', ...) section but it didn't make any difference

how do i work with this?

@jonah97
Copy link

jonah97 commented Nov 10, 2021

I have kind of the same problem:

const results = []

fs.createReadStream('data.csv', 'binary')
    .pipe(csv({ separator: ';' }))
    .on('data', data => results.push(data))
    .on('end', () => {
        console.log(results.length)
    })

It reads the first 26442 rows and throws the 'end' event then. My file has 36000 rows. Don't know how to fix that...

@sumanth-mandalapu
Copy link

sumanth-mandalapu commented Nov 28, 2021

Is the result returned from the promise function used inside the on('data') function?

@m4rkus
Copy link

m4rkus commented Dec 26, 2021

Have the same problem as the OP, seems that .on('end) is called before .on('data') has finished executing

@gdethier
Copy link

gdethier commented Apr 5, 2022

Using the close event instead of end solved the problem for me. Note that when using an async function for data processing, the stream has to be explicitly paused and resumed. Here is an example (in TypeScript, which is not far from JS anyway):

return new Promise<void>((resolve, reject) => {
    const stream = fs.createReadStream(path)
        .pipe(csv())
        .on("data", async data => {
            stream.pause();
            try {
                await processData(data);
            } finally {
                stream.resume();
            }
        })
        .on("error", error => reject(error))
        .on("close", () => resolve());
});

NB: More advanced error handling is probably needed around processData.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants