diff --git a/src/bucket.ts b/src/bucket.ts index a7c9eec38..7e0ec08f0 100644 --- a/src/bucket.ts +++ b/src/bucket.ts @@ -347,6 +347,8 @@ export interface UploadOptions encryptionKey?: string | Buffer; kmsKeyName?: string; resumable?: boolean; + // tslint:disable-next-line:no-any + onUploadProgress?: (progressEvent: any) => void; } export interface MakeAllFilesPublicPrivateOptions { @@ -3464,9 +3466,13 @@ class Bucket extends ServiceObject { } function upload() { + const writable = newFile.createWriteStream(options); + if (options.onUploadProgress) { + writable.on('progress', options.onUploadProgress); + } fs.createReadStream(pathString) .on('error', callback!) - .pipe(newFile.createWriteStream(options)) + .pipe(writable) .on('error', callback!) .on('finish', () => { callback!(null, newFile, newFile.metadata); diff --git a/src/file.ts b/src/file.ts index 0a70292de..c89b5d532 100644 --- a/src/file.ts +++ b/src/file.ts @@ -37,7 +37,7 @@ import * as os from 'os'; // eslint-disable-next-line @typescript-eslint/no-var-requires const pumpify = require('pumpify'); import * as resumableUpload from 'gcs-resumable-upload'; -import {Duplex, Writable, Readable} from 'stream'; +import {Duplex, Writable, Readable, Transform} from 'stream'; import * as streamEvents from 'stream-events'; import * as through from 'through2'; import * as xdgBasedir from 'xdg-basedir'; @@ -349,7 +349,10 @@ export interface CreateReadStreamOptions { decompress?: boolean; } -export type SaveOptions = CreateWriteStreamOptions; +export interface SaveOptions extends CreateWriteStreamOptions { + // tslint:disable-next-line:no-any + onUploadProgress?: (progressEvent: any) => void; +} export interface SaveCallback { (err?: Error | null): void; @@ -1736,6 +1739,10 @@ class File extends ServiceObject { const fileWriteStream = duplexify(); + fileWriteStream.on('progress', evt => { + stream.emit('progress', evt); + }); + const stream = streamEvents( pumpify([ gzip ? zlib.createGzip() : through(), @@ -3383,10 +3390,14 @@ class File extends ServiceObject { const options = typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - this.createWriteStream(options) + const writable = this.createWriteStream(options) .on('error', callback!) - .on('finish', callback!) - .end(data); + .on('finish', callback!); + if (options.onUploadProgress) { + writable.on('progress', options.onUploadProgress); + } + + writable.end(data); } setStorageClass( storageClass: string, @@ -3545,7 +3556,8 @@ class File extends ServiceObject { }) .on('finish', () => { dup.emit('complete'); - }); + }) + .on('progress', evt => dup.emit('progress', evt)); dup.setWritable(uploadStream); } diff --git a/system-test/storage.ts b/system-test/storage.ts index b24cfe463..87c64e9ce 100644 --- a/system-test/storage.ts +++ b/system-test/storage.ts @@ -86,6 +86,7 @@ import { DeleteNotificationCallback, } from '../src'; import * as nock from 'nock'; +import * as readline from 'readline'; interface ErrorCallbackFunction { (err: Error | null): void; @@ -2811,6 +2812,41 @@ describe('storage', () => { }); }); + describe('bucket upload with progress', () => { + it('show bytes sent with resumable upload', async () => { + const fileSize = fs.statSync(FILES.big.path).size; + let called = false; + function onUploadProgress(evt: {bytesWritten: number}) { + called = true; + assert.strictEqual(typeof evt.bytesWritten, 'number'); + assert.ok(evt.bytesWritten >= 0 && evt.bytesWritten <= fileSize); + } + + await bucket.upload(FILES.big.path, { + resumable: true, + onUploadProgress, + }); + + assert.strictEqual(called, true); + }); + + it('show bytes sent with simple upload', async () => { + const fileSize = fs.statSync(FILES.big.path).size; + let called = false; + function onUploadProgress(evt: {bytesWritten: number}) { + called = true; + assert.strictEqual(typeof evt.bytesWritten, 'number'); + assert.ok(evt.bytesWritten >= 0 && evt.bytesWritten <= fileSize); + } + await bucket.upload(FILES.big.path, { + resumable: false, + onUploadProgress, + }); + + assert.strictEqual(called, true); + }); + }); + describe('channels', () => { it('should create a channel', done => { const config = { diff --git a/test/file.ts b/test/file.ts index 963d58673..dec1a2968 100644 --- a/test/file.ts +++ b/test/file.ts @@ -1725,6 +1725,53 @@ describe('File', () => { writable.write('data'); }); + it('should emit progress via resumable upload', done => { + const progress = {}; + + resumableUploadOverride = { + upload() { + const uploadStream = new stream.PassThrough(); + setImmediate(() => { + uploadStream.emit('progress', progress); + }); + + return uploadStream; + }, + }; + + const writable = file.createWriteStream(); + + writable.on('progress', (evt: {}) => { + assert.strictEqual(evt, progress); + done(); + }); + + writable.write('data'); + }); + + it('should emit progress via simple upload', done => { + const progress = {}; + + makeWritableStreamOverride = (dup: duplexify.Duplexify) => { + const uploadStream = new stream.PassThrough(); + uploadStream.on('progress', evt => dup.emit('progress', evt)); + + dup.setWritable(uploadStream); + setImmediate(() => { + uploadStream.emit('progress', progress); + }); + }; + + const writable = file.createWriteStream({resumable: false}); + + writable.on('progress', (evt: {}) => { + assert.strictEqual(evt, progress); + done(); + }); + + writable.write('data'); + }); + it('should start a simple upload if specified', done => { const options = { metadata: METADATA, @@ -3782,6 +3829,21 @@ describe('File', () => { file.save(DATA, assert.ifError); }); + it('should register the progress listener if onUploadProgress is passed', done => { + const onUploadProgress = util.noop; + file.createWriteStream = () => { + const writeStream = new stream.PassThrough(); + setImmediate(() => { + const [listener] = writeStream.listeners('progress'); + assert.strictEqual(listener, onUploadProgress); + done(); + }); + return writeStream; + }; + + file.save(DATA, {onUploadProgress}, assert.ifError); + }); + it('should write the data', done => { file.createWriteStream = () => { const writeStream = new stream.PassThrough(); @@ -4080,6 +4142,28 @@ describe('File', () => { file.startResumableUpload_(dup); }); + + it('should emit progress event', done => { + const progress = {}; + const dup = duplexify(); + dup.on('progress', evt => { + assert.strictEqual(evt, progress); + done(); + }); + + resumableUploadOverride = { + upload() { + const uploadStream = new stream.Transform(); + setImmediate(() => { + uploadStream.emit('progress', progress); + }); + + return uploadStream; + }, + }; + + file.startResumableUpload_(dup); + }); }); });