Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable bytes read tracking #1074

Merged
merged 31 commits into from May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
818f150
feat: enable bytesrRead tracking on uploads
AVaksman Jan 7, 2020
72dbea6
refactor: only attach listener if flagged by user
AVaksman Jan 14, 2020
00aea08
lint: lint
AVaksman Feb 13, 2020
edf63fa
refactor: forward progress event from resumable or simple upload to user
AVaksman Mar 2, 2020
9131ec5
lint: lint
AVaksman Mar 2, 2020
a19deee
fix: add listener on a condition
AVaksman Mar 2, 2020
32c1637
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Mar 2, 2020
039a768
Merge branch 'master' into enable_bytesRead_tracking
jkwlui Mar 2, 2020
f063ae9
refactor: only register progress listener if onUploadProgress param i…
AVaksman Mar 4, 2020
f5496a1
feat: add onUploadProgress to file.save()
AVaksman Mar 4, 2020
1116127
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Mar 4, 2020
52dcc37
lint: lint
AVaksman Mar 4, 2020
aa0fcdd
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Mar 11, 2020
04d9c15
Merge branch 'master' into enable_bytesRead_tracking
jkwlui Mar 13, 2020
b74521d
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Mar 16, 2020
2c515c6
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Mar 23, 2020
e55cf34
refactor: readability
AVaksman Mar 23, 2020
bfe3e3b
chore: move event handler assignment on it own line
AVaksman Apr 2, 2020
84c08a1
fix(deps): update dependency @google-cloud/common to v3
AVaksman Apr 2, 2020
f960bcb
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Apr 2, 2020
f6a9c1e
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Apr 7, 2020
ed711f7
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Apr 20, 2020
c80bf75
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Apr 27, 2020
5fc5975
Merge branch 'master' into enable_bytesRead_tracking
AVaksman Apr 27, 2020
ae665f9
test: typo
AVaksman Apr 27, 2020
38841f7
Merge branch 'master' into enable_bytesRead_tracking
AVaksman May 4, 2020
67bacf2
Merge branch 'master' into enable_bytesRead_tracking
AVaksman May 4, 2020
2f9055f
Merge branch 'master' into enable_bytesRead_tracking
AVaksman May 7, 2020
512352d
Merge branch 'master' into enable_bytesRead_tracking
stephenplusplus May 12, 2020
66a76e0
Merge branch 'master' into enable_bytesRead_tracking
stephenplusplus May 12, 2020
195cd46
Merge branch 'master' into enable_bytesRead_tracking
stephenplusplus May 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/bucket.ts
Expand Up @@ -347,6 +347,8 @@ export interface UploadOptions
encryptionKey?: string | Buffer;
kmsKeyName?: string;
resumable?: boolean;
// tslint:disable-next-line:no-any
onUploadProgress?: (progressEvent: any) => void;
}

export interface MakeAllFilesPublicPrivateOptions {
Expand Down Expand Up @@ -3461,9 +3463,13 @@ class Bucket extends ServiceObject {
}

function upload() {
const writable = newFile.createWriteStream(options);
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}
fs.createReadStream(pathString)
.on('error', callback!)
.pipe(newFile.createWriteStream(options))
.pipe(writable)
.on('error', callback!)
.on('finish', () => {
callback!(null, newFile, newFile.metadata);
Expand Down
24 changes: 18 additions & 6 deletions src/file.ts
Expand Up @@ -37,7 +37,7 @@ import * as os from 'os';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const pumpify = require('pumpify');
import * as resumableUpload from 'gcs-resumable-upload';
import {Duplex, Writable, Readable} from 'stream';
import {Duplex, Writable, Readable, Transform} from 'stream';
import * as streamEvents from 'stream-events';
import * as through from 'through2';
import * as xdgBasedir from 'xdg-basedir';
Expand Down Expand Up @@ -349,7 +349,10 @@ export interface CreateReadStreamOptions {
decompress?: boolean;
}

export type SaveOptions = CreateWriteStreamOptions;
export interface SaveOptions extends CreateWriteStreamOptions {
// tslint:disable-next-line:no-any
onUploadProgress?: (progressEvent: any) => void;
}

export interface SaveCallback {
(err?: Error | null): void;
Expand Down Expand Up @@ -1736,6 +1739,10 @@ class File extends ServiceObject<File> {

const fileWriteStream = duplexify();

fileWriteStream.on('progress', evt => {
stream.emit('progress', evt);
});

const stream = streamEvents(
pumpify([
gzip ? zlib.createGzip() : through(),
Expand Down Expand Up @@ -3380,10 +3387,14 @@ class File extends ServiceObject<File> {
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};

this.createWriteStream(options)
const writable = this.createWriteStream(options)
.on('error', callback!)
.on('finish', callback!)
.end(data);
.on('finish', callback!);
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}

writable.end(data);
}
setStorageClass(
storageClass: string,
Expand Down Expand Up @@ -3542,7 +3553,8 @@ class File extends ServiceObject<File> {
})
.on('finish', () => {
dup.emit('complete');
});
})
.on('progress', evt => dup.emit('progress', evt));

dup.setWritable(uploadStream);
}
Expand Down
36 changes: 36 additions & 0 deletions system-test/storage.ts
Expand Up @@ -86,6 +86,7 @@ import {
DeleteNotificationCallback,
} from '../src';
import * as nock from 'nock';
import * as readline from 'readline';

interface ErrorCallbackFunction {
(err: Error | null): void;
Expand Down Expand Up @@ -2811,6 +2812,41 @@ describe('storage', () => {
});
});

describe('bucket upload with progress', () => {
it('show bytes sent with resumable upload', async () => {
const fileSize = fs.statSync(FILES.big.path).size;
let called = false;
function onUploadProgress(evt: {bytesWritten: number}) {
called = true;
assert.strictEqual(typeof evt.bytesWritten, 'number');
assert.ok(evt.bytesWritten >= 0 && evt.bytesWritten <= fileSize);
}

await bucket.upload(FILES.big.path, {
resumable: true,
onUploadProgress,
});

assert.strictEqual(called, true);
});

it('show bytes sent with simple upload', async () => {
const fileSize = fs.statSync(FILES.big.path).size;
let called = false;
function onUploadProgress(evt: {bytesWritten: number}) {
called = true;
assert.strictEqual(typeof evt.bytesWritten, 'number');
assert.ok(evt.bytesWritten >= 0 && evt.bytesWritten <= fileSize);
}
await bucket.upload(FILES.big.path, {
resumable: false,
onUploadProgress,
});

assert.strictEqual(called, true);
});
});

describe('channels', () => {
it('should create a channel', done => {
const config = {
Expand Down
84 changes: 84 additions & 0 deletions test/file.ts
Expand Up @@ -1725,6 +1725,53 @@ describe('File', () => {
writable.write('data');
});

it('should emit progress via resumable upload', done => {
const progress = {};

resumableUploadOverride = {
upload() {
const uploadStream = new stream.PassThrough();
setImmediate(() => {
uploadStream.emit('progress', progress);
});

return uploadStream;
},
};

const writable = file.createWriteStream();

writable.on('progress', (evt: {}) => {
assert.strictEqual(evt, progress);
done();
});

writable.write('data');
});

it('should emit progress via simple upload', done => {
const progress = {};

makeWritableStreamOverride = (dup: duplexify.Duplexify) => {
const uploadStream = new stream.PassThrough();
uploadStream.on('progress', evt => dup.emit('progress', evt));

dup.setWritable(uploadStream);
setImmediate(() => {
uploadStream.emit('progress', progress);
});
};

const writable = file.createWriteStream({resumable: false});

writable.on('progress', (evt: {}) => {
assert.strictEqual(evt, progress);
done();
});

writable.write('data');
});

it('should start a simple upload if specified', done => {
const options = {
metadata: METADATA,
Expand Down Expand Up @@ -3752,6 +3799,21 @@ describe('File', () => {
file.save(DATA, assert.ifError);
});

it('should register the progress listener if onUploadProgress is passed', done => {
const onUploadProgress = util.noop;
file.createWriteStream = () => {
const writeStream = new stream.PassThrough();
setImmediate(() => {
const [progsListns] = writeStream.listeners('progress');
assert.strictEqual(progsListns, onUploadProgress);
done();
});
return writeStream;
};

file.save(DATA, {onUploadProgress}, assert.ifError);
});

it('should write the data', done => {
file.createWriteStream = () => {
const writeStream = new stream.PassThrough();
Expand Down Expand Up @@ -4050,6 +4112,28 @@ describe('File', () => {

file.startResumableUpload_(dup);
});

it('should emit progress event', done => {
const progress = {};
const dup = duplexify();
dup.on('progress', evt => {
assert.strictEqual(evt, progress);
done();
});

resumableUploadOverride = {
upload() {
const uploadStream = new stream.Transform();
setImmediate(() => {
uploadStream.emit('progress', progress);
});

return uploadStream;
},
};

file.startResumableUpload_(dup);
});
});
});

Expand Down