Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replaces job#getQueryResults with table#getRows in query method #454

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 21 additions & 6 deletions src/index.ts
Expand Up @@ -1022,7 +1022,6 @@ export class BigQuery extends common.Service {
if ((!options || !options.query) && !options.pageToken) {
throw new Error('A SQL query string is required.');
}

// tslint:disable-next-line no-any
const query: any = extend(
true,
Expand Down Expand Up @@ -1463,8 +1462,7 @@ export class BigQuery extends common.Service {
return new Job(this, id, options);
}

query(query: string, options?: QueryOptions): Promise<QueryRowsResponse>;
query(query: Query, options?: QueryOptions): Promise<SimpleQueryRowsResponse>;
query(query: string, options?: QueryOptions): Promise<RowsResponse>;
query(
query: string,
options: QueryOptions,
Expand Down Expand Up @@ -1571,9 +1569,10 @@ export class BigQuery extends common.Service {
optionsOrCallback?:
| QueryOptions
| SimpleQueryRowsCallback
| RowsCallback
| QueryRowsCallback,
cb?: SimpleQueryRowsCallback | QueryRowsCallback
): void | Promise<SimpleQueryRowsResponse> | Promise<QueryRowsResponse> {
cb?: SimpleQueryRowsCallback | RowsCallback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It surprised me that QueryRowsCallback had to change RowsCallback, since in the case of model queries we still return the original response:

 job!.getQueryResults(options, callback as QueryRowsCallback);

☝️ this is the original slow path right? which we returned as a QueryRowsCallback?

Mainly just making sure code-paths return the same shaped object.

): void | Promise<RowsResponse> {
let options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
const callback =
Expand All @@ -1590,7 +1589,23 @@ export class BigQuery extends common.Service {
// The Job is important for the `queryAsStream_` method, so a new query
// isn't created each time results are polled for.
options = extend({job}, options);
job!.getQueryResults(options, callback as QueryRowsCallback);

// table#getRows uses listTableData endpoint, which is a faster method
// to read rows of results.

job!.getQueryResults(options, (err, rows) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So getQueryResults is going to get called continuously until the job is done and all the rows have been fetched. That doesn't sound like something we want here, so we'll probably need to set autoPaginate to false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I pieced this together, I have a feeling we are going to need to manually poll this method until we see the jobComplete value set to true. If there are 0 rows, we execute the users callback with an empty array and the response object. If a page was returned, then we should call listTableData.

@shollyman does that sound right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Typical flow for query processing should look something like this in terms of backend interactions:

  1. Call bigquery.jobs.insert() to insert the query job into the system,
    which returns a job resource (and job reference) for future polling
    operations.

  2. Call bigquery.jobs.getQueryResults to poll (possibly repeatedly) for
    job completion. Typically you do this with a nonzero timeoutMs param (which
    moves polling to the server side as a hanging get), and zero maxRows
    param (as we're not consuming row data here).

  3. When job is complete, you can consume schema from the getQueryResults,
    and then setup the node equivalent of a row iterator if the getQueryResults
    response indicates rows were returned. Many queries don't return results,
    such as DML or DDL operations (UPDATE TABLE, ALTER TABLE, etc).

  4. Row iteration should use bigquery.tabledata.list to fetch pages of row data.
    Each page optionally returns a next page token, which is passed to the next
    invocation to advance the cursor (don't use explicit row offsets) until no further
    page token is provided.

if (!err) {
if (rows!.length !== 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws me off a little bit, does this imply that the best way to go is to poll the getQueryResults API until we get a single page of results and only then should we call listTableData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what Seth told me they do in the other languages and how I should handle it to be consistent.

const datasetId = job!.metadata.configuration.query.destinationTable
.datasetId;
const tableId = job!.metadata.configuration.query.destinationTable
.tableId;
const dataset = this.dataset(datasetId);
const table = dataset.table(tableId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're probably going to need to cache the destinationTable in case users don't want to autoPaginate the results

table.getRows(options, callback as RowsCallback);
}
}
});
});
}

Expand Down
225 changes: 144 additions & 81 deletions test/index.ts
Expand Up @@ -29,6 +29,7 @@ import * as extend from 'extend';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as uuid from 'uuid';
import {EventEmitter} from 'events';

import {BigQueryDate, Dataset, Job, Table} from '../src';
import {JobOptions} from '../src/job';
Expand Down Expand Up @@ -66,52 +67,6 @@ const fakeUtil = extend({}, util, {
});
const originalFakeUtil = extend(true, {}, fakeUtil);

class FakeDataset {
calledWith_: IArguments;
constructor() {
this.calledWith_ = arguments;
}
}

class FakeTable extends Table {
constructor(a: Dataset, b: string) {
super(a, b);
}
}

class FakeJob {
calledWith_: IArguments;
constructor() {
this.calledWith_ = arguments;
}
}

let extended = false;
const fakePaginator = {
paginator: {
extend: (c: Function, methods: string[]) => {
if (c.name !== 'BigQuery') {
return;
}
methods = arrify(methods);
assert.strictEqual(c.name, 'BigQuery');
assert.deepStrictEqual(methods, ['getDatasets', 'getJobs']);
extended = true;
},
streamify: (methodName: string) => {
return methodName;
},
},
};

class FakeService extends Service {
calledWith_: IArguments;
constructor(config: ServiceConfig, options: ServiceOptions) {
super(config, options);
this.calledWith_ = arguments;
}
}

const sandbox = sinon.createSandbox();
afterEach(() => sandbox.restore());

Expand All @@ -127,6 +82,57 @@ describe('BigQuery', () => {
// tslint:disable-next-line no-any
let bq: any;

class FakeTable extends Table {
constructor(a: Dataset, b: string) {
super(a, b);
}
}

class FakeDataset extends Dataset {
calledWith_: IArguments;
constructor() {
super(bq, '1');
this.calledWith_ = arguments;
}

table(id: string): FakeTable {
return new FakeTable(this, id);
}
}

class FakeJob {
calledWith_: IArguments;
constructor() {
this.calledWith_ = arguments;
}
}

let extended = false;
const fakePaginator = {
paginator: {
extend: (c: Function, methods: string[]) => {
if (c.name !== 'BigQuery') {
return;
}
methods = arrify(methods);
assert.strictEqual(c.name, 'BigQuery');
assert.deepStrictEqual(methods, ['getDatasets', 'getJobs']);
extended = true;
},
streamify: (methodName: string) => {
return methodName;
},
},
};

class FakeService extends Service {
calledWith_: IArguments;
constructor(config: ServiceConfig, options: ServiceOptions) {
super(config, options);
this.calledWith_ = arguments;
}
}

before(() => {
BigQuery = proxyquire('../src', {
uuid: fakeUuid,
Expand Down Expand Up @@ -1758,21 +1764,19 @@ describe('BigQuery', () => {
const FAKE_ROWS = [{}, {}, {}];
const FAKE_RESPONSE = {};
const QUERY_STRING = 'SELECT * FROM [dataset.table]';

it('should return any errors from createQueryJob', done => {
steffnay marked this conversation as resolved.
Show resolved Hide resolved
const error = new Error('err');

bq.createQueryJob = (query: {}, callback: Function) => {
callback(error, null, FAKE_RESPONSE);
};

bq.query(QUERY_STRING, (err: Error, rows: {}, resp: {}) => {
assert.strictEqual(err, error);
assert.strictEqual(rows, null);
assert.strictEqual(resp, FAKE_RESPONSE);
done();
});
});
const MODEL_QUERY_STRING = `CREATE OR REPLACE MODEL \`dataset.model\``;
const TABLE_ID = 'bq_table';
const DATASET_ID = 'bq_dataset';
const METADATA = {
configuration: {
query: {
destinationTable: {
datasetId: DATASET_ID,
tableId: TABLE_ID,
},
},
},
};

it('should exit early if dryRun is set', done => {
const options = {
Expand All @@ -1793,59 +1797,101 @@ describe('BigQuery', () => {
});
});

it('should call job#getQueryResults', done => {
it('should call table#getRows', done => {
// const fakeJob = new EventEmitter();
const fakeJob = {
getQueryResults: (options: {}, callback: Function) => {
callback(null, FAKE_ROWS, FAKE_RESPONSE);
},
};
// tslint:disable-next-line: no-any
steffnay marked this conversation as resolved.
Show resolved Hide resolved
(fakeJob as any).metadata = METADATA;

bq.createQueryJob = (query: {}, callback: Function) => {
callback(null, fakeJob, FAKE_RESPONSE);
};

bq.query(QUERY_STRING, (err: Error, rows: {}, resp: {}) => {
assert.ifError(err);
assert.strictEqual(rows, FAKE_ROWS);
assert.strictEqual(resp, FAKE_RESPONSE);
done();
});
const fakeTable = {
getRows(options: {}, cb: Function) {
assert.deepStrictEqual(options, {job: fakeJob});
cb(); // done()
},
};

const fakeDataset = {
table(id: string) {
assert.strictEqual(id, TABLE_ID);
return fakeTable;
},
};

bq.dataset = (id: string) => {
assert.strictEqual(id, DATASET_ID);
return fakeDataset;
};

bq.query(QUERY_STRING, done);
// fakeJob.emit('complete', METADATA);
});

it('should assign Job on the options', done => {
it('should call job#getQueryResults for model query', done => {
const fakeJob = {
getQueryResults: (options: {}, callback: Function) => {
assert.deepStrictEqual(options, {job: fakeJob});
done();
callback(null, FAKE_ROWS, FAKE_RESPONSE);
},
};

bq.createQueryJob = (query: {}, callback: Function) => {
callback(null, fakeJob, FAKE_RESPONSE);
};

bq.query(QUERY_STRING, assert.ifError);
bq.query(MODEL_QUERY_STRING, (err: Error, rows: {}, resp: {}) => {
assert.ifError(err);
assert.strictEqual(rows, FAKE_ROWS);
assert.strictEqual(resp, FAKE_RESPONSE);
done();
});
});

it('should optionally accept options', done => {
const fakeOptions = {};
it('should return any errors from createQueryJob', done => {
const error = new Error('err');

bq.createQueryJob = (query: {}, callback: Function) => {
callback(error, null, FAKE_RESPONSE);
};

bq.query(QUERY_STRING, (err: Error, rows: {}, resp: {}) => {
assert.strictEqual(err, error);
assert.strictEqual(rows, null);
assert.strictEqual(resp, FAKE_RESPONSE);
done();
});
});

it('should return any errors from job', done => {
const fakeJob = {
getQueryResults: (options: {}) => {
assert.notStrictEqual(options, fakeOptions);
assert.deepStrictEqual(options, {job: fakeJob});
done();
getQueryResults: (options: {}, callback: Function) => {
callback(null, FAKE_ROWS, FAKE_RESPONSE);
},
};

bq.createQueryJob = (query: {}, callback: Function) => {
callback(null, fakeJob, FAKE_RESPONSE);
const error = new Error('Error.');

bq.createQueryJob = ({}, callback: Function) => {
callback(null, fakeJob, error);
};

bq.query(QUERY_STRING, fakeOptions, assert.ifError);
bq.query(QUERY_STRING, (err: Error) => {
assert.strictEqual(err, error);
done();
});
});
});

describe('queryAsStream_', () => {
const FAKE_ROWS = [{}, {}, {}];
const FAKE_RESPONSE = {};

it('should call query correctly', done => {
const query = 'SELECT';
bq.query = (query_: {}, options: {}, callback: Function) => {
Expand All @@ -1855,5 +1901,22 @@ describe('BigQuery', () => {
};
bq.queryAsStream_(query, done);
});

it('should call query correctly with a job', done => {
const fakeJob = {
getQueryResults: (query: {}, callback: Function) => {
assert.strictEqual(query, query);
callback(null, FAKE_ROWS, FAKE_RESPONSE);
},
};

const query = {job: fakeJob};

bq.query = (query_: {}, options: {}, callback: Function) => {
assert.strictEqual(query_, query);
callback(); // done()
};
bq.queryAsStream_(query, done);
});
});
});