From 8e26fb561a9595e0f05e0506cebb71aa1eaba432 Mon Sep 17 00:00:00 2001 From: Andrew Zammit Date: Mon, 30 Mar 2020 21:58:06 -0700 Subject: [PATCH] refactor!: don't return Stream from createLoadJob (#647) * fix!(table): createLoadJobStream sync returns a stream, createLoadJob always returns a job #640 * chore(table): remove createLoadJobStream, createLoadJob test refactor for promises #640 * chore(table): remove never encountered callback noop in createLoadJob given promisifyAll * test(biqquery): add tests to increase codecov as a result of #647 refactor Co-authored-by: Benjamin E. Coe Co-authored-by: Steffany Brown <30247553+steffnay@users.noreply.github.com> --- package.json | 1 + src/table.ts | 57 +++++---- test/bigquery.ts | 62 +++++++++- test/table.ts | 304 ++++++++++++++++++++++++----------------------- 4 files changed, 248 insertions(+), 176 deletions(-) diff --git a/package.json b/package.json index 187b6513..7274eb7c 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "duplexify": "^4.0.0", "extend": "^3.0.2", "is": "^3.3.0", + "p-event": "^4.1.0", "stream-events": "^1.0.5", "string-format-obj": "^1.1.1", "uuid": "^7.0.0" diff --git a/src/table.ts b/src/table.ts index cbe2d59d..9e89b3df 100644 --- a/src/table.ts +++ b/src/table.ts @@ -20,6 +20,7 @@ import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import Big from 'big.js'; import * as extend from 'extend'; +import pEvent from 'p-event'; const format = require('string-format-obj'); import * as fs from 'fs'; @@ -1102,20 +1103,16 @@ class Table extends common.ServiceObject { this.bigQuery.createJob(body, callback!); } - createLoadJob(source: string, metadata?: JobLoadMetadata): Writable; - createLoadJob(source: File, metadata?: JobLoadMetadata): Promise; createLoadJob( - source: string, - metadata: JobLoadMetadata, - callback: JobCallback - ): Writable; + source: string | File, + metadata?: JobLoadMetadata + ): Promise; createLoadJob( - source: File, + source: string | File, metadata: JobLoadMetadata, callback: JobCallback ): void; - createLoadJob(source: string, callback: JobCallback): Writable; - createLoadJob(source: File, callback: JobCallback): void; + createLoadJob(source: string | File, callback: JobCallback): void; /** * Load data from a local file or Storage {@link * https://cloud.google.com/nodejs/docs/reference/storage/latest/File File}. @@ -1129,10 +1126,10 @@ class Table extends common.ServiceObject { * * @see [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert} * - * @param {string|File} source The source file to load. A string or a - * {@link + * @param {string|File|File[]} source The source file to load. A string (path) + * to a local file, or one or more {@link * https://cloud.google.com/nodejs/docs/reference/storage/latest/File File} - * object. + * objects. * @param {object} [metadata] Metadata to set with the load operation. The * metadata object should be in the format of the * [`configuration.load`](http://goo.gl/BVcXk4) property of a Jobs @@ -1203,17 +1200,31 @@ class Table extends common.ServiceObject { * }); */ createLoadJob( - source: string | File, + source: string | File | File[], metadataOrCallback?: JobLoadMetadata | JobCallback, cb?: JobCallback - ): void | Promise | Writable { + ): void | Promise { const metadata = typeof metadataOrCallback === 'object' ? metadataOrCallback : {}; const callback = - typeof metadataOrCallback === 'function' - ? metadataOrCallback - : cb || common.util.noop; + typeof metadataOrCallback === 'function' ? metadataOrCallback : cb; + + this._createLoadJob(source, metadata).then( + ([resp]) => callback!(null, resp, resp.metadata), + err => callback!(err) + ); + } + /** + * @param {string | File | File[]} source + * @param {JobLoadMetadata} metadata + * @returns {Promise} + * @private + */ + async _createLoadJob( + source: string | File | File[], + metadata: JobLoadMetadata + ): Promise { if (metadata.format) { metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()]; delete metadata.format; @@ -1238,13 +1249,11 @@ class Table extends common.ServiceObject { } // Read the file into a new write stream. - return fs + const jobWritable = fs .createReadStream(source) - .pipe(this.createWriteStream_(metadata)) - .on('error', callback) - .on('job', job => { - callback(null, job, job.metadata); - }); + .pipe(this.createWriteStream_(metadata)); + const jobResponse = (await pEvent(jobWritable, 'job')) as Job; + return [jobResponse, jobResponse.metadata]; } // tslint:disable-next-line no-any @@ -1298,7 +1307,7 @@ class Table extends common.ServiceObject { }), }); - this.bigQuery.createJob(body, callback); + return this.bigQuery.createJob(body); } createQueryJob(options: Query): Promise; diff --git a/test/bigquery.ts b/test/bigquery.ts index b2a5ddc6..ba1f017b 100644 --- a/test/bigquery.ts +++ b/test/bigquery.ts @@ -37,6 +37,7 @@ import { JobOptions, TableField, } from '../src'; +import {SinonStub} from 'sinon'; const fakeUuid = extend(true, {}, uuid); @@ -981,6 +982,39 @@ describe('BigQuery', () => { assert.strictEqual(structValues.key, expectedParameterValue); }); + it('should format a struct with provided type', () => { + const struct = {a: 1}; + const providedType = {a: 'INT64'}; + + const getTypeStub = sandbox.stub( + BigQuery, + 'getTypeDescriptorFromProvidedType_' + ); + getTypeStub.onFirstCall().returns({ + type: 'STRUCT', + structTypes: [ + { + name: 'a', + type: { + type: 'INT64', + }, + }, + ], + }); + getTypeStub.onSecondCall().returns({type: 'INT64'}); + + const queryParameter = BigQuery.valueToQueryParameter_( + struct, + providedType + ); + const structValues = queryParameter.parameterValue.structValues; + assert.deepStrictEqual(structValues, { + a: { + value: 1, + }, + }); + }); + it('should format an array of structs', () => { const structs = [{name: 'Stephen'}]; const expectedParam = { @@ -2101,14 +2135,34 @@ describe('BigQuery', () => { }); describe('queryAsStream_', () => { + let queryStub: SinonStub; + + beforeEach(() => { + queryStub = sandbox.stub(bq, 'query').callsArgAsync(2); + }); + it('should call query correctly', done => { const query = 'SELECT'; - bq.query = (query_: {}, options: {}, callback: Function) => { - assert.strictEqual(query_, query); - assert.deepStrictEqual(options, {autoPaginate: false}); - callback(); // done() + bq.queryAsStream_(query, done); + assert( + queryStub.calledOnceWithExactly( + query, + {autoPaginate: false}, + sinon.match.func + ) + ); + }); + + it('should query as job if supplied', done => { + const cbStub = sinon.stub().callsArgAsync(1); + const query = { + job: { + getQueryResults: cbStub, + }, }; bq.queryAsStream_(query, done); + assert(cbStub.calledOnceWithExactly(query, sinon.match.func)); + assert(queryStub.notCalled); }); }); }); diff --git a/test/table.ts b/test/table.ts index a20e4342..018c30d0 100644 --- a/test/table.ts +++ b/test/table.ts @@ -60,6 +60,7 @@ const fakePfy = extend({}, pfy, { if (c.name === 'Table') { promisified = true; } + pfy.promisifyAll(c); }, }); @@ -652,18 +653,21 @@ describe('BigQuery/Table', () => { DEST_TABLE = new Table(DATASET, 'destination-table'); }); - it('should throw if a destination is not a Table', () => { - assert.throws(() => { - table.createCopyJob(); - }, /Destination must be a Table/); + it('should throw if a destination is not a Table', async () => { + await assert.rejects( + async () => table.createCopyJob(), + /Destination must be a Table/ + ); - assert.throws(() => { - table.createCopyJob({}); - }, /Destination must be a Table/); + await assert.rejects( + async () => table.createCopyJob({}), + /Destination must be a Table/ + ); - assert.throws(() => { - table.createCopyJob(() => {}); - }, /Destination must be a Table/); + await assert.rejects( + async () => table.createCopyJob(() => {}), + /Destination must be a Table/ + ); }); it('should send correct request to the API', done => { @@ -765,22 +769,26 @@ describe('BigQuery/Table', () => { SOURCE_TABLE = new Table(DATASET, 'source-table'); }); - it('should throw if a source is not a Table', () => { - assert.throws(() => { - table.createCopyFromJob(['table']); - }, /Source must be a Table/); + it('should throw if a source is not a Table', async () => { + await assert.rejects( + async () => table.createCopyFromJob(['table']), + /Source must be a Table/ + ); - assert.throws(() => { - table.createCopyFromJob([SOURCE_TABLE, 'table']); - }, /Source must be a Table/); + await assert.rejects( + async () => table.createCopyFromJob([SOURCE_TABLE, 'table']), + /Source must be a Table/ + ); - assert.throws(() => { - table.createCopyFromJob({}); - }, /Source must be a Table/); + await assert.rejects( + async () => table.createCopyFromJob({}), + /Source must be a Table/ + ); - assert.throws(() => { - table.createCopyFromJob(() => {}); - }, /Source must be a Table/); + await assert.rejects( + async () => table.createCopyFromJob(() => {}), + /Source must be a Table/ + ); }); it('should send correct request to the API', done => { @@ -1154,12 +1162,21 @@ describe('BigQuery/Table', () => { metadata: {}, }; + let bqCreateJobStub: sinon.SinonStub; + beforeEach(() => { + bqCreateJobStub = sinon + .stub(table.bigQuery, 'createJob') + .resolves([JOB, JOB.metadata]); isCustomTypeOverride = () => { return true; }; }); + afterEach(() => { + bqCreateJobStub.restore(); + }); + it('should accept just a File and a callback', done => { table.createWriteStream_ = () => { const ws = new stream.Writable(); @@ -1178,11 +1195,6 @@ describe('BigQuery/Table', () => { }); }); - it('should return a stream when a string is given', () => { - sandbox.stub(table, 'createWriteStream_').returns(new stream.Writable()); - assert(table.createLoadJob(FILEPATH) instanceof stream.Stream); - }); - it('should infer the file format from the given filepath', done => { table.createWriteStream_ = (metadata: JobLoadMetadata) => { assert.strictEqual(metadata.sourceFormat, 'NEWLINE_DELIMITED_JSON'); @@ -1241,152 +1253,147 @@ describe('BigQuery/Table', () => { table.createLoadJob(FILE, assert.ifError); }); - it('should throw if a File object is not provided', () => { + it('should throw if a File object is not provided', async () => { isCustomTypeOverride = () => { return false; }; - - assert.throws(() => { - table.createLoadJob({}); - }, /Source must be a File object/); + await assert.rejects( + async () => table.createLoadJob({}), + /Source must be a File object/ + ); }); - it('should convert File objects to gs:// urls', done => { - table.bigQuery.createJob = (reqOpts: JobOptions) => { - const sourceUri = reqOpts.configuration!.load!.sourceUris![0]; - assert.strictEqual( - sourceUri, - 'gs://' + FILE.bucket.name + '/' + FILE.name - ); - done(); - }; - - table.createLoadJob(FILE, assert.ifError); + it('should convert File objects to gs:// urls', async () => { + await table.createLoadJob(FILE); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + configuration: { + load: { + sourceUris: ['gs://' + FILE.bucket.name + '/' + FILE.name], + }, + }, + }) + ); }); - it('should infer the file format from a File object', done => { - table.bigQuery.createJob = (reqOpts: JobOptions) => { - const sourceFormat = reqOpts.configuration!.load!.sourceFormat; - assert.strictEqual(sourceFormat, 'NEWLINE_DELIMITED_JSON'); - done(); - }; - - table.createLoadJob(FILE, assert.ifError); + it('should infer the file format from a File object', async () => { + await table.createLoadJob(FILE); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + configuration: { + load: { + sourceFormat: 'NEWLINE_DELIMITED_JSON', + }, + }, + }) + ); }); - it('should not override a provided format with a File', done => { - table.bigQuery.createJob = (reqOpts: JobOptions) => { - const sourceFormat = reqOpts.configuration!.load!.sourceFormat; - assert.strictEqual(sourceFormat, 'NEWLINE_DELIMITED_JSON'); - done(); - }; - - table.createLoadJob( - FILE, - { - sourceFormat: 'NEWLINE_DELIMITED_JSON', - }, - assert.ifError + it('should not override a provided format with a File', async () => { + await table.createLoadJob(FILE, {sourceFormat: 'AVRO'}); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + configuration: { + load: { + sourceFormat: 'AVRO', + }, + }, + }) ); }); - it('should pass the callback to createJob', done => { - table.bigQuery.createJob = (reqOpts: JobOptions, callback: Function) => { - assert.strictEqual(done, callback); - callback(); // the done fn - }; - - table.createLoadJob(FILE, {}, done); + it('should use bigQuery.createJob', async () => { + await table.createLoadJob(FILE, {}); + assert(bqCreateJobStub.calledOnce); }); - it('should optionally accept options', done => { - table.bigQuery.createJob = (reqOpts: JobOptions, callback: Function) => { - assert.strictEqual(done, callback); - callback(); // the done fn - }; - - table.createLoadJob(FILE, done); + it('should optionally accept options', async () => { + await table.createLoadJob(FILE); + assert(bqCreateJobStub.calledOnce); }); - it('should set the job prefix', done => { - const fakeJobPrefix = 'abc'; - - table.bigQuery.createJob = (reqOpts: JobOptions) => { - assert.strictEqual(reqOpts.jobPrefix, fakeJobPrefix); - assert.strictEqual( - // tslint:disable-next-line no-any - (reqOpts.configuration!.load as any).jobPrefix, - undefined - ); - done(); - }; - - table.createLoadJob( - FILE, - { - jobPrefix: fakeJobPrefix, - }, - assert.ifError + it('should set the job prefix', async () => { + const jobPrefix = 'abc'; + await table.createLoadJob(FILE, {jobPrefix}); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + jobPrefix, + configuration: { + load: { + jobPrefix: undefined, + }, + }, + }) ); }); - it('should use the default location', done => { + it('should use the default location', async () => { const table = new Table(DATASET, TABLE_ID, {location: LOCATION}); - - table.bigQuery.createJob = (reqOpts: JobOptions, callback: Function) => { - assert.strictEqual(reqOpts.location, LOCATION); - callback(); // the done fn - }; - - table.createLoadJob(FILE, done); + await table.createLoadJob(FILE); + assert(bqCreateJobStub.calledWithMatch({location: LOCATION})); }); - it('should accept a job id', done => { + it('should accept a job id', async () => { const jobId = 'job-id'; - const options = {jobId}; - - table.bigQuery.createJob = (reqOpts: JobOptions) => { - assert.strictEqual(reqOpts.jobId, jobId); - assert.strictEqual( - // tslint:disable-next-line no-any - (reqOpts.configuration!.load as any).jobId, - undefined - ); - done(); - }; - - table.createLoadJob(FILE, options, assert.ifError); + await table.createLoadJob(FILE, {jobId}); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + jobId, + configuration: { + load: { + jobId: undefined, + }, + }, + }) + ); }); describe('formats', () => { - it('should accept csv', done => { - table.bigQuery.createJob = (reqOpts: JobOptions) => { - const load = reqOpts.configuration!.load!; - assert.strictEqual(load.sourceFormat, 'CSV'); - done(); - }; - - table.createLoadJob(FILE, {format: 'csv'}, assert.ifError); + it('should accept csv', async () => { + await table.createLoadJob(FILE, {format: 'csv'}); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + configuration: { + load: { + sourceFormat: 'CSV', + }, + }, + }) + ); }); - it('should accept json', done => { - table.bigQuery.createJob = (reqOpts: JobOptions) => { - const load = reqOpts.configuration!.load!; - assert.strictEqual(load.sourceFormat, 'NEWLINE_DELIMITED_JSON'); - done(); - }; - - table.createLoadJob(FILE, {format: 'json'}, assert.ifError); + it('should accept json', async () => { + await table.createLoadJob(FILE, {format: 'json'}); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + configuration: { + load: { + sourceFormat: 'NEWLINE_DELIMITED_JSON', + }, + }, + }) + ); }); - it('should accept avro', done => { - table.bigQuery.createJob = (reqOpts: JobOptions) => { - const load = reqOpts.configuration!.load!; - assert.strictEqual(load.sourceFormat, 'AVRO'); - done(); - }; - - table.createLoadJob(FILE, {format: 'avro'}, assert.ifError); + it('should accept avro', async () => { + await table.createLoadJob(FILE, {format: 'avro'}); + assert(bqCreateJobStub.calledOnce); + assert( + bqCreateJobStub.calledWithMatch({ + configuration: { + load: { + sourceFormat: 'AVRO', + }, + }, + }) + ); }); }); }); @@ -2020,10 +2027,11 @@ describe('BigQuery/Table', () => { }; }); - it('should throw an error if rows is empty', () => { - assert.throws(() => { - table.insert([]); - }, /You must provide at least 1 row to be inserted\./); + it('should throw an error if rows is empty', async () => { + await assert.rejects( + async () => table.insert([]), + /You must provide at least 1 row to be inserted/ + ); }); it('should save data', done => {