Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: don't return Stream from createLoadJob #647

Merged
merged 7 commits into from Mar 31, 2020
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -57,6 +57,7 @@
"duplexify": "^4.0.0",
"extend": "^3.0.2",
"is": "^3.3.0",
"p-event": "^4.1.0",
"stream-events": "^1.0.5",
"string-format-obj": "^1.1.1",
"uuid": "^7.0.0"
Expand Down
57 changes: 33 additions & 24 deletions src/table.ts
Expand Up @@ -20,6 +20,7 @@ import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import Big from 'big.js';
import * as extend from 'extend';
import pEvent from 'p-event';

const format = require('string-format-obj');
import * as fs from 'fs';
Expand Down Expand Up @@ -1102,20 +1103,16 @@ class Table extends common.ServiceObject {
this.bigQuery.createJob(body, callback!);
}

createLoadJob(source: string, metadata?: JobLoadMetadata): Writable;
createLoadJob(source: File, metadata?: JobLoadMetadata): Promise<JobResponse>;
createLoadJob(
source: string,
metadata: JobLoadMetadata,
callback: JobCallback
): Writable;
source: string | File,
metadata?: JobLoadMetadata
): Promise<JobResponse>;
createLoadJob(
source: File,
source: string | File,
metadata: JobLoadMetadata,
callback: JobCallback
): void;
createLoadJob(source: string, callback: JobCallback): Writable;
createLoadJob(source: File, callback: JobCallback): void;
createLoadJob(source: string | File, callback: JobCallback): void;
/**
* Load data from a local file or Storage {@link
* https://cloud.google.com/nodejs/docs/reference/storage/latest/File File}.
Expand All @@ -1129,10 +1126,10 @@ class Table extends common.ServiceObject {
*
* @see [Jobs: insert API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/insert}
*
* @param {string|File} source The source file to load. A string or a
* {@link
* @param {string|File|File[]} source The source file to load. A string (path)
* to a local file, or one or more {@link
* https://cloud.google.com/nodejs/docs/reference/storage/latest/File File}
* object.
* objects.
* @param {object} [metadata] Metadata to set with the load operation. The
* metadata object should be in the format of the
* [`configuration.load`](http://goo.gl/BVcXk4) property of a Jobs
Expand Down Expand Up @@ -1203,17 +1200,31 @@ class Table extends common.ServiceObject {
* });
*/
createLoadJob(
source: string | File,
source: string | File | File[],
metadataOrCallback?: JobLoadMetadata | JobCallback,
cb?: JobCallback
): void | Promise<JobResponse> | Writable {
): void | Promise<JobResponse> {
const metadata =
typeof metadataOrCallback === 'object' ? metadataOrCallback : {};
const callback =
typeof metadataOrCallback === 'function'
? metadataOrCallback
: cb || common.util.noop;
typeof metadataOrCallback === 'function' ? metadataOrCallback : cb;

this._createLoadJob(source, metadata).then(
stephenplusplus marked this conversation as resolved.
Show resolved Hide resolved
([resp]) => callback!(null, resp, resp.metadata),
err => callback!(err)
);
}

/**
* @param {string | File | File[]} source
* @param {JobLoadMetadata} metadata
* @returns {Promise<JobResponse>}
* @private
*/
async _createLoadJob(
source: string | File | File[],
metadata: JobLoadMetadata
): Promise<JobResponse> {
if (metadata.format) {
metadata.sourceFormat = FORMATS[metadata.format.toLowerCase()];
delete metadata.format;
Expand All @@ -1238,13 +1249,11 @@ class Table extends common.ServiceObject {
}

// Read the file into a new write stream.
return fs
const jobWritable = fs
.createReadStream(source)
.pipe(this.createWriteStream_(metadata))
.on('error', callback)
stephenplusplus marked this conversation as resolved.
Show resolved Hide resolved
.on('job', job => {
callback(null, job, job.metadata);
});
.pipe(this.createWriteStream_(metadata));
const jobResponse = (await pEvent(jobWritable, 'job')) as Job;
return [jobResponse, jobResponse.metadata];
}

// tslint:disable-next-line no-any
Expand Down Expand Up @@ -1298,7 +1307,7 @@ class Table extends common.ServiceObject {
}),
});

this.bigQuery.createJob(body, callback);
return this.bigQuery.createJob(body);
}

createQueryJob(options: Query): Promise<JobResponse>;
Expand Down
62 changes: 58 additions & 4 deletions test/bigquery.ts
Expand Up @@ -37,6 +37,7 @@ import {
JobOptions,
TableField,
} from '../src';
import {SinonStub} from 'sinon';

const fakeUuid = extend(true, {}, uuid);

Expand Down Expand Up @@ -981,6 +982,39 @@ describe('BigQuery', () => {
assert.strictEqual(structValues.key, expectedParameterValue);
});

it('should format a struct with provided type', () => {
const struct = {a: 1};
const providedType = {a: 'INT64'};

const getTypeStub = sandbox.stub(
BigQuery,
'getTypeDescriptorFromProvidedType_'
);
getTypeStub.onFirstCall().returns({
type: 'STRUCT',
structTypes: [
{
name: 'a',
type: {
type: 'INT64',
},
},
],
});
getTypeStub.onSecondCall().returns({type: 'INT64'});

const queryParameter = BigQuery.valueToQueryParameter_(
struct,
providedType
);
const structValues = queryParameter.parameterValue.structValues;
assert.deepStrictEqual(structValues, {
a: {
value: 1,
},
});
});

it('should format an array of structs', () => {
const structs = [{name: 'Stephen'}];
const expectedParam = {
Expand Down Expand Up @@ -2101,14 +2135,34 @@ describe('BigQuery', () => {
});

describe('queryAsStream_', () => {
let queryStub: SinonStub;

beforeEach(() => {
queryStub = sandbox.stub(bq, 'query').callsArgAsync(2);
});

it('should call query correctly', done => {
const query = 'SELECT';
bq.query = (query_: {}, options: {}, callback: Function) => {
assert.strictEqual(query_, query);
assert.deepStrictEqual(options, {autoPaginate: false});
callback(); // done()
bq.queryAsStream_(query, done);
assert(
queryStub.calledOnceWithExactly(
query,
{autoPaginate: false},
sinon.match.func
)
);
});

it('should query as job if supplied', done => {
const cbStub = sinon.stub().callsArgAsync(1);
const query = {
job: {
getQueryResults: cbStub,
},
};
bq.queryAsStream_(query, done);
assert(cbStub.calledOnceWithExactly(query, sinon.match.func));
assert(queryStub.notCalled);
});
});
});