From 2ec0489127faea88dca95e6dc169efe6e55d330d Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Fri, 24 Jan 2020 13:39:32 -0800 Subject: [PATCH] fix: retry streaming methods if initial write errored (#897) According to https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback, the write() callback may be called with an Error before on('error') is invoked. In this case, we should reject our Stream. In the current implementation, we would accept the Stream as healthy and then forward the error to the user, which prevents retries. I noticed while looking at this code as we are porting it to Python --- dev/src/index.ts | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/dev/src/index.ts b/dev/src/index.ts index b2c052087..8b3749632 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -1148,8 +1148,6 @@ export class Firestore { } } - backendStream.on('data', () => streamReady()); - function streamEnded() { logger( 'Firestore._initializeStream', @@ -1161,11 +1159,7 @@ export class Firestore { lifetime.resolve(); } - backendStream.on('end', () => streamEnded()); - backendStream.on('close', () => streamEnded()); - backendStream.on('finish', () => streamEnded()); - - backendStream.on('error', err => { + function streamFailed(err: Error) { if (!streamInitialized) { // If we receive an error before we were able to receive any data, // reject this stream. @@ -1191,7 +1185,13 @@ export class Firestore { resultStream.emit('error', err); }); } - }); + } + + backendStream.on('data', () => streamReady()); + backendStream.on('error', err => streamFailed(err)); + backendStream.on('end', () => streamEnded()); + backendStream.on('close', () => streamEnded()); + backendStream.on('finish', () => streamEnded()); backendStream.pipe(resultStream); @@ -1202,13 +1202,17 @@ export class Firestore { 'Sending request: %j', request ); - backendStream.write(request, 'utf-8', () => { - logger( - 'Firestore._initializeStream', - requestTag, - 'Marking stream as healthy' - ); - streamReady(); + backendStream.write(request, 'utf-8', err => { + if (err) { + streamFailed(err); + } else { + logger( + 'Firestore._initializeStream', + requestTag, + 'Marking stream as healthy' + ); + streamReady(); + } }); } });