Skip to content

Commit

Permalink
refactor!: don't return Stream from createLoadJob (#647)
Browse files Browse the repository at this point in the history
* fix!(table): createLoadJobStream sync returns a stream, createLoadJob always returns a job #640

* chore(table): remove createLoadJobStream, createLoadJob test refactor for promises #640

* chore(table): remove never encountered callback noop in createLoadJob given promisifyAll

* test(biqquery): add tests to increase codecov as a result of #647 refactor

Co-authored-by: Benjamin E. Coe <bencoe@google.com>
Co-authored-by: Steffany Brown <30247553+steffnay@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 31, 2020
1 parent 712b029 commit 8e26fb5
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 176 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -57,6 +57,7 @@
"duplexify": "^4.0.0",
"extend": "^3.0.2",
"is": "^3.3.0",
"p-event": "^4.1.0",
"stream-events": "^1.0.5",
"string-format-obj": "^1.1.1",
"uuid": "^7.0.0"
Expand Down
57 changes: 33 additions & 24 deletions src/table.ts
Expand Up @@ -20,6 +20,7 @@ import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import Big from 'big.js';
import * as extend from 'extend';
import pEvent from 'p-event';

const format = require('string-format-obj');
import * as fs from 'fs';
Expand Down Expand Up @@ -1102,20 +1103,16 @@ class Table extends common.ServiceObject {
this.bigQuery.createJob(body, callback!);
}

createLoadJob(source: string, metadata?: JobLoadMetadata): Writable;
createLoadJob(source: File, metadata?: JobLoadMetadata): Promise<JobResponse>;
createLoadJob(
source: string,
metadata: JobLoadMetadata,
callback: JobCallback
): Writable;
source: string | File,
metadata?: JobLoadMetadata
): Promise<JobResponse>;
createLoadJob(
source: File,
source: string | File,
metadata: JobLoadMetadata,
callback: JobCallback
): void;
createLoadJob(source: string, callback: JobCallback): Writable;
createLoadJob(source: File, callback: JobCallback): void;
createLoadJob(source: string | File, callback: JobCallback): void;
/**
* Load data from a local file or Storage {@link
* https://cloud.google.com/nodejs/docs/reference/storage/latest/File File}.
Expand All @@ -1129,10 +1126,10 @@ class Table extends common.ServiceObject {
*
* @see [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert}
*
* @param {string|File} source The source file to load. A string or a
* {@link
* @param {string|File|File[]} source The source file to load. A string (path)
* to a local file, or one or more {@link
* https://cloud.google.com/nodejs/docs/reference/storage/latest/File File}
* object.
* objects.
* @param {object} [metadata] Metadata to set with the load operation. The
* metadata object should be in the format of the
* [`configuration.load`](http://goo.gl/BVcXk4) property of a Jobs
Expand Down Expand Up @@ -1203,17 +1200,31 @@ class Table extends common.ServiceObject {
* });
*/
createLoadJob(
source: string | File,
source: string | File | File[],
metadataOrCallback?: JobLoadMetadata | JobCallback,
cb?: JobCallback
): void | Promise<JobResponse> | Writable {
): void | Promise<JobResponse> {
const metadata =
typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
const callback =
typeof metadataOrCallback === 'function'
? metadataOrCallback
: cb || common.util.noop;
typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;

this._createLoadJob(source, metadata).then(
([resp]) => callback!(null, resp, resp.metadata),
err => callback!(err)
);
}

/**
* @param {string | File | File[]} source
* @param {JobLoadMetadata} metadata
* @returns {Promise<JobResponse>}
* @private
*/
async _createLoadJob(
source: string | File | File[],
metadata: JobLoadMetadata
): Promise<JobResponse> {
if (metadata.format) {
metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()];
delete metadata.format;
Expand All @@ -1238,13 +1249,11 @@ class Table extends common.ServiceObject {
}

// Read the file into a new write stream.
return fs
const jobWritable = fs
.createReadStream(source)
.pipe(this.createWriteStream_(metadata))
.on('error', callback)
.on('job', job => {
callback(null, job, job.metadata);
});
.pipe(this.createWriteStream_(metadata));
const jobResponse = (await pEvent(jobWritable, 'job')) as Job;
return [jobResponse, jobResponse.metadata];
}

// tslint:disable-next-line no-any
Expand Down Expand Up @@ -1298,7 +1307,7 @@ class Table extends common.ServiceObject {
}),
});

this.bigQuery.createJob(body, callback);
return this.bigQuery.createJob(body);
}

createQueryJob(options: Query): Promise<JobResponse>;
Expand Down
62 changes: 58 additions & 4 deletions test/bigquery.ts
Expand Up @@ -37,6 +37,7 @@ import {
JobOptions,
TableField,
} from '../src';
import {SinonStub} from 'sinon';

const fakeUuid = extend(true, {}, uuid);

Expand Down Expand Up @@ -981,6 +982,39 @@ describe('BigQuery', () => {
assert.strictEqual(structValues.key, expectedParameterValue);
});

it('should format a struct with provided type', () => {
const struct = {a: 1};
const providedType = {a: 'INT64'};

const getTypeStub = sandbox.stub(
BigQuery,
'getTypeDescriptorFromProvidedType_'
);
getTypeStub.onFirstCall().returns({
type: 'STRUCT',
structTypes: [
{
name: 'a',
type: {
type: 'INT64',
},
},
],
});
getTypeStub.onSecondCall().returns({type: 'INT64'});

const queryParameter = BigQuery.valueToQueryParameter_(
struct,
providedType
);
const structValues = queryParameter.parameterValue.structValues;
assert.deepStrictEqual(structValues, {
a: {
value: 1,
},
});
});

it('should format an array of structs', () => {
const structs = [{name: 'Stephen'}];
const expectedParam = {
Expand Down Expand Up @@ -2101,14 +2135,34 @@ describe('BigQuery', () => {
});

describe('queryAsStream_', () => {
let queryStub: SinonStub;

beforeEach(() => {
queryStub = sandbox.stub(bq, 'query').callsArgAsync(2);
});

it('should call query correctly', done => {
const query = 'SELECT';
bq.query = (query_: {}, options: {}, callback: Function) => {
assert.strictEqual(query_, query);
assert.deepStrictEqual(options, {autoPaginate: false});
callback(); // done()
bq.queryAsStream_(query, done);
assert(
queryStub.calledOnceWithExactly(
query,
{autoPaginate: false},
sinon.match.func
)
);
});

it('should query as job if supplied', done => {
const cbStub = sinon.stub().callsArgAsync(1);
const query = {
job: {
getQueryResults: cbStub,
},
};
bq.queryAsStream_(query, done);
assert(cbStub.calledOnceWithExactly(query, sinon.match.func));
assert(queryStub.notCalled);
});
});
});

0 comments on commit 8e26fb5

Please sign in to comment.