Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error: You cannot pipe after data has been emitted from the response. #887

Closed
joe-spanning opened this issue Apr 30, 2014 · 30 comments
Closed

Comments

@joe-spanning
Copy link

I am encountering an issue where I will get the "You cannot pipe after data has been emitted from the response." error when trying to pipe to a WriteStream. This only occurs when there has been a delay between when the request() call was made, and when the readStream.pipe(writeStream) call is made.

Node version: v0.10.24
Request version: 2.34.0

I created a github repo with the source that reproduces the problem:
https://github.com/joe-spanning/request-error

Code is also below for convenience:

var fs = require('fs'),
  request = require('request');

var readStream = request({
  url: 'https://gist.githubusercontent.com/joe-spanning/6902070/raw/8967b351aec744a2bb51c16cb847c44636cf53d9/pipepromise.js'
});

// wait for 5 seconds, then pipe
setTimeout(function() {

  var writeStream = readStream.pipe(fs.createWriteStream('test.js'));

  writeStream.on('finish', function() {
    console.log('all done!');
  });

  writeStream.on('error', function(err) {
    console.error(err);
  });

}, 5000);

You might need to play with the timeout value to reproduce the error. Longer timeouts increase the probability that the error will occur.

/request-error/node_modules/request/request.js:1299
      throw new Error("You cannot pipe after data has been emitted from the re
            ^
Error: You cannot pipe after data has been emitted from the response.
    at Request.pipe (/request-error/node_modules/request/request.js:1299:13)
    at null._onTimeout (/request-error/pipe-error.js:16:32)
    at Timer.listOnTimeout [as ontimeout] (timers.js:110:15)
@funmachine
Copy link

Also seeing this under similar circumstances.

@aldeed
Copy link

aldeed commented Jul 13, 2014

I don't know what other implications it might have, but I am able to solve this issue by altering request.js as follows:

Add:

var superOn = Request.prototype.on;
Request.prototype.on = function (eventName) {
  if (eventName === "data") {
    this.resume()
  }
  superOn.apply(this, arguments)
}

And change the dataStream.on("data" call to this:

dataStream.on("data", function (chunk) {
      var emitted = self.emit("data", chunk)
      if (emitted) {
        self._destdata = true
      } else {
        // pause URL stream until we pipe it
        dataStream.pause()
        dataStream.unshift(chunk)
      }
})

The basic issue (I think) is that onResponse is calling resume() and attaching a "data" listener, both of which start the data flowing, but we might not be ready to pipe it yet. So my solution is if we try to emit data but we don't have any listeners yet, we pause the underlying readstream until we get a data listener attached.

Would be good if someone else could test this solution and verify that it works.

@joe-spanning
Copy link
Author

Nah, doesn't work for me. Due to the lack of activity on this issue, i'm just going to stop using request and go back to the Node.js http module.

@aldeed
Copy link

aldeed commented Jul 14, 2014

I should mention that my fix is for request 2.37.0. It would not work with an older release.

joepie91 added a commit to joepie91/request that referenced this issue Jul 29, 2014
@joepie91
Copy link

@aldeed I have forked the repository and applied your fix: https://github.com/joepie91/request. Depending on the degree to which this (original) repository is maintained in the future, I might merge more fixes into there. I'm not sure yet.

I'd been debugging this issue for a few hours now, but was unable to find a solution until I read this thread. Some more technical background information on why this problem is occurring follows. It's a bit simplified, to make it easier to follow along.

request offers a 'streaming API' in the sense that you can .pipe the original request object to a WritableStream. However, since this request object is not the response stream itself, request 'shims' the data event - it creates a listener for the data event on the response stream, and then re-emits this event on itself, thus being able to have the request object act as a stream itself.

There's only one problem: as soon as you attach to the data event of a stream, it starts flowing.

Now this is normally not an issue; Node.js works with 'ticks' - consider them 'cycles' in the interpreter. The simplified explanation is that one block of code will consume a 'tick', and no events will be checked or fired until the next tick. This means that the following hypothetical code will work fine:

stream = getSomeRequestStream();
stream.pipe(target);

Both lines are executed in the same tick, and the first data event can't possibly occur until the next tick - that's how the interpreter works internally. This means that you have a guarantee that the .pipe is processed before the first data event fires, and everything will work as expected.

Now, what @joe-spanning attempted to do was use an asynchronous construct, a timeout in this particular example. The stream initialization and .pipe no longer exist in the same block of code.

Asynchronous constructs follow the same rule as events - they can't possibly execute until the next tick. However, by the time your asynchronous construct executes, the event cycle has just finished and the first data event is already fired - your first chunk of data comes through in the gap between the stream initialization and your .pipe. request detects that a data event has already fired, and tells you that you can no longer .pipe the stream since it has already started flowing.

This same problem occurs for anything that happens at least one tick later than the stream initialization.

The solution by @aldeed fixes this by checking if there are any listeners attached to the request object, and if not, putting the chunk of data back in the original stream and pausing that original stream - thus stopping it from flowing. Once a listener is attached, it then resumes the original stream, and this time there will be a data event handler listening.

The way the data event shim works in request is actually a design flaw. It does not take into account the possibility that stream initialization and piping may not occur in the same tick, and that Node.js is an inherently asynchronous platform (thus, these kind of situations are to be expected). While this fix does work, and should work reliably, a better solution would be to defer the binding to the data event on the original stream, until the first listener is attached.

One last note; if you define a callback in your request initialization, this fix will not work.

Internally, if a callback is supplied, request will attach a data event handler to itself, read out the body, and supply the body to your callback. The way around this, is to attach to the response event instead (and not specify a callback). This event will (reliably) fire as soon as a response is received and the request is ready for streaming.

@aldeed I've been trying to fix this for hours, with all my attempts to actually fix it failing, and I can now finally continue working on my thing. I owe you a beer :)

@mikeal
Copy link
Member

mikeal commented Jul 29, 2014

A quick note about this issue as it relates to new-style stream and the 3.0 branch.

With new-style readable streams they are opened in what could be considered a "paused & buffering" state, they won't let out data until they are read() from.

This means we'll have to ditch most of the nextTick() hacks. I've dealt with this a bit in other libraries and what we'll have to do is wait for the first read() and then trigger all the "lazy" code.

One obvious consequence is that in 3.0 if you pass a callback it will buffer all the content. Currently, if you access the streaming API it disables buffering but without the nextTick hack we can't infer that.

@joepie91
Copy link

@Mikael Would you accept this fix as a pull request for the current master branch?

@mikeal
Copy link
Member

mikeal commented Jul 29, 2014

I'd need to see it as a PR in order to evaluate it.

@joepie91
Copy link

Alright. It's probably best to leave the actual pull request creation up to @aldeed, given that he contributed the original fix - having it merged into the main repository should probably be his choice, licensing-wise and all that.

@aldeed
Copy link

aldeed commented Jul 29, 2014

Thanks, @joepie91. @mikeal, as I recall, the way I read the code in 2.37.0, it's already using new-style streams so they are "paused and buffered", as you say, initially. The problem comes when you attach your own "data" listener and call resume, which you do in different places, because these tell the readstream to unpause itself. So as @joepie91 says, a true solution would require re-engineering the internals such that you don't attach a "data" listener.

From the docs:

In Node v0.10, the Readable class described below was added. For backwards compatibility with older Node programs, Readable streams switch into "flowing mode" when a 'data' event handler is added, or when the pause() or resume() methods are called. The effect is that, even if you are not using the new read() method and 'readable' event, you no longer have to worry about losing 'data' chunks.

@mikeal
Copy link
Member

mikeal commented Jul 29, 2014

2.37 is definitely not using new-style streams :) You must pipe the request object to a destination before it starts emitting data events. Similarly, you must pipe to a request instance in the same tick that you create it or else similar tricks will fail (although we have some code to try and mitigate this in some cases, but not all).

@aldeed
Copy link

aldeed commented Jul 29, 2014

OK, I probably misread or I'm misremembering or I looked at 3.0 code, too, and got confused. :)

@ZJONSSON
Copy link
Contributor

The quickest fix might be to pipe immediately to a stream.PassThrough() object
You should be able pipe from the passthrough object at any later time without issues.
See: http://nodejs.org/api/stream.html#stream_class_stream_passthrough

@joepie91
Copy link

@ZJONSSON I did try that at some point, but it didn't work either. As far as I understand, it's prone to the same limitation (no re-emission of data emitted prior to a later stream attachment) as any other kind of stream, so you'd still be stuck if the original attachment by request and your own attachment to the request object don't happen in the same tick.

@ZJONSSON
Copy link
Contributor

@joepie91 Taking the original example at the top and piping immediately to a PassThrough runs without problems. Maybe I'm misunderstanding the issue?

var fs = require('fs'),
  request = require('request');

var readStream = request({
  url: 'https://gist.githubusercontent.com/joe-spanning/6902070/raw/8967b351aec744a2bb51c16cb847c44636cf53d9/pipepromise.js'
})
.pipe(require('stream').PassThrough());  // this line is the only modification

// wait for 5 seconds, then pipe
setTimeout(function() {

  var writeStream = readStream.pipe(fs.createWriteStream('test.js'));

  writeStream.on('finish', function() {
    console.log('all done!');
  });

  writeStream.on('error', function(err) {
    console.error(err);
  });

}, 5000);

@joepie91
Copy link

I honestly have no idea. If I recall correctly, that is exactly what I tried, and it didn't help. Has the underlying code in request perhaps changed in the meantime?

I should add that the example you modified is actually that of @joe-spanning. I'm not sure if there may be a difference between that and my code that makes it only work in his case.

@aldeed
Copy link

aldeed commented Sep 28, 2014

Submitted PR #1098

@kapouer
Copy link

kapouer commented Oct 1, 2014

Internally, if a callback is supplied, request will attach a data event handler to itself, read out the body, and supply the body to your callback. The way around this, is to attach to the response event instead >(and not specify a callback). This event will (reliably) fire as soon as a response is received and the request is ready for streaming.

I just got bitten by request(url, mycb).pipe(mywstream)
sometimes mycb is called, sometimes not.
Maybe request should warn that using pipe() with a callback already set should be avoided ?

@dogancelik
Copy link
Contributor

I have the same problem except, I get this error when I try to .pipe in .on('response'):

req = request(options)
req.on 'response', (response) ->
  # get info from headers here
  stream = createWriteStream file
  req.pipe stream

@joepie91
Copy link

Update from my side, not sure why I didn't post this before.

Due to this issue not being resolved timely and running across a number of other frustrating bugs in request, I've written my own HTTP client library a while ago that deals correctly with this usecase; bhttp. It also solves the Streams2 support issues.

@alolis
Copy link

alolis commented Aug 9, 2017

Is there an update on this?

@julien-c
Copy link

I know this issue is old, but in my case (similar to @dogancelik, piping a response conditionally based on the response headers), I was able to do the following:

const req = request.get(URL);
req.on('response', (response) => {
	response.pause();
	// Do stuff with response.statusCode etc.
	setTimeout(() => {
		response
		.pipe(process.stdout)
		;
	}, 2000);
});

i.e., pausing the response stream (request apparently sets the stream to flowing mode automatically?) and piping the response object itself (not the req).

I'd welcome feedback from users more knowledgeable than me about this.

@sadikyalcin
Copy link

Just come across this too... I solved it from my end as it was a logical error in my case.

I have a global array where I store each request via a unique id. I'm running multiple requests simultaneously. I do have an abort option for the requests. This may give you more info;

unzipper = function() {
    var self = this;

    /**
     * Properties
     */
    this.req = [];
    this.aborted = [];
    /* */

    /**
     * Methods
     */
    this.download = function(id, onProgress, onSuccess, onError, onAbort){

        var newinstanceAbort = {
          id: id,
          abort: false
        };

        this.aborted.push(newinstanceAbort);

        var url = '...';
        var target = '...';

        var request = require('request');
        var j = request.jar();
        var progress = require('request-progress');
        var cookie = request.cookie(app.session.sessionName + '=' + app.session.sessionId);
        j.setCookie(cookie, url);

        var downloadRequest = {
          id: id,
          request: request({url: url, jar: j})
        };

        this.req.push(downloadRequest);

        var instanceRequest = app.findObject(this.req, id);
        var instanceAbort = app.findObject(this.aborted, id);

        progress(instanceRequest.request, {
            throttle: 10,
        })
        .on('progress', function (state) {
            var progress = Math.round(state.percent * 100);
            onProgress(progress);
        })
        .on('error', function (err) {
            onError(err);
        })
        .on('end', function (e) {
            if(instanceAbort.abort) {
                onAbort();
            } else {
                onSuccess();
            }

            /**
             * Remove the unique request object from the req to complete the request
             *
             * If this is not done, the request would throw "You cannot pipe after data has been emitted from the response" as it is constantly re-emitting the same request
             */
            self.complete(id);

        })
        .pipe(app.fs.createWriteStream(target));
    };



    this.abort = function(id){
        var instanceAbort = app.findObject(this.aborted, id);
        var instanceRequest = app.findObject(this.req, id);

        instanceRequest.request.abort();
        instanceAbort.abort = true;
    };

    this.complete = function(id){
        var index = this.req.map(function(x){ return x.id; }).indexOf(id);

        this.req.splice(index, 1);
    };
    /* */

};

@mikermcneil
Copy link
Contributor

@julien-c Good call, this indeed seems to be because binding a response listener causes it to switch into to "flowing mode" automatically. I can verify that it's happening in both request@2.81.0 and request@2.83.0.

Not sure if this is intended or not (my understanding was that this isn't how it was supposed to work). For now, I'll plan on using the workaround of pausing the stream (I'll try & remember to post back here if I find out anything new along the way)

@ulihecht
Copy link

I'm randomly experiencing the same like dogancelik in the comment above: The error occurs when piping inside the response handler. But how can this be possible? If you look at request.js, the "response" event is emitted BEFORE request's own "data" handler (which causes the trouble) is attached at all:

self.emit('response', response)

self.dests.forEach(function (dest) {
  self.pipeDest(dest)
})

responseContent.on('data', function (chunk) {
  if (self.timing && !self.responseStarted) {
    self.responseStartTime = (new Date()).getTime()

    // NOTE: responseStartTime is deprecated in favor of .timings
    response.responseStartTime = self.responseStartTime
  }
  self._destdata = true
  self.emit('data', chunk)
})

As the event handlers are called synchronously I expect the pipe to be attached before data is consumed.

@jiajianrong
Copy link

I get the same err with following code:

let rp = require('request-promise');
let options = {uri: 'http://a-url-pointing-online-pdf-link'};

koaRouter.get('/api/url/download', async(ctx, next) => {
        try {
            ctx.set('Content-disposition', 'attachment;filename=a.pdf');
            ctx.body = rp(options);  
        } catch (e) {
            ctx.body = e;
        }
    })   

And I work arount id by using PassThrough, but I don't know why the error cames, and also I don't know why the passthrough solves. All my code is supposed to be executed in the same 'tick'.

let rp = require('request-promise');
let stream = require('stream');
let options = {uri: 'http://a-url-pointing-online-pdf-link'};

koaRouter.get('/api/url/download', async(ctx, next) => {
        try {
            ctx.set('Content-disposition', 'attachment;filename=a.pdf');
            let pass = stream.PassThrough();
            rp(options).pipe(pass);
            ctx.body = pass;
        } catch (e) {
            ctx.body = e;
        }
    })   

@Dzenly
Copy link

Dzenly commented Nov 14, 2018

The similar bug exists for request object also. If you did write to it, you can not pipe to it.

I also switched to core http/https modules for streams. It is easy and reliable. So request is not needed for streams.

@aswanee
Copy link

aswanee commented Mar 1, 2019

Don't pipe tel response happened
this work fine with me

` var request = require('request');
var fs = require('fs');
var path = require('path');

downloadFile(file_url , targetPath){
// Save variable to know progress

var _self = this;
var received_bytes = 0;
var total_bytes = 0;

var req = request({
    method: 'GET',
    uri: file_url
});

var out = fs.createWriteStream(targetPath);
**///////////////MOVE THIS LINE  FROM HERE TO ///////////////////

/////////////// BE INSIDE req.on('response', function ( data )///////
//////////////////////////req.pipe(out);/////// ////////////////////////**

req.on('response', function ( data ) {
    **### req.pipe(out);**
    // Change the total bytes value to get progress later.
    total_bytes = parseInt(data.headers['content-length' ]);
});

req.on('data', function(chunk) {
    // Update the received bytes
    received_bytes += chunk.length;

    _self.showProgress(received_bytes, total_bytes);
    var percentage = (received_bytes * 100) / total_bytes;
    console.log(percentage + "% | " + received_bytes + " bytes out of " + total_bytes + " bytes.");

});

req.on('end', function() {
    alert("File succesfully downloaded");
});

}`

@reconbot
Copy link
Contributor

reconbot commented Apr 1, 2019

Request is now in maintenance mode and wont be merging any new features. Please see #3142 for more information.

@danmana
Copy link

danmana commented Dec 8, 2021

Can confirm that the solution from julien-c works for request@2.88.2 as well. #887 (comment)

We had a similar setup with a Promise instead of setTimeout, where the .pipe() was called in a later tick, after .on('response').
Without response.pause() we were seeing corrupted files (missing bytes at the beginning)

function getReadStream() {
  return new Promise((resolve, reject) => {
    const req = request.get('...');
    req.on('response', (response) => {
      response.pause(); // -> this solves the issue
      if (response.statusCode === 200) {
        resolve(response);
      } else {
        reject(new Error('Error ' + response.statusCode));
      }
    });
    req.on('error', reject);
  });
}

// somewhere else
const readStream = await getReadStream();
readStream.pipe(writeStream);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests