Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(table): add retries for insert partial failures #589

Merged
merged 9 commits into from Apr 13, 2020
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -85,6 +85,7 @@
"mocha": "^7.0.0",
"mv": "^2.1.1",
"ncp": "^2.0.0",
"p-reflect": "^2.1.0",
"proxyquire": "^2.1.0",
"sinon": "^9.0.0",
"tmp": "0.1.0",
Expand Down
204 changes: 140 additions & 64 deletions src/table.ts
Expand Up @@ -73,6 +73,7 @@ export type RowMetadata = any;

export type InsertRowsOptions = bigquery.ITableDataInsertAllRequest & {
createInsertId?: boolean;
partialRetries?: number;
raw?: boolean;
schema?: string | {};
};
Expand Down Expand Up @@ -136,6 +137,12 @@ export type FormattedMetadata = bigquery.ITable;
export type TableSchema = bigquery.ITableSchema;
export type TableField = bigquery.ITableFieldSchema;

export interface PartialInsertFailure {
message: string;
reason: string;
row: RowMetadata;
}

/**
* The file formats accepted by BigQuery.
*
Expand Down Expand Up @@ -1733,6 +1740,11 @@ class Table extends common.ServiceObject {
* If you need to create an entire table from a file, consider using
* {@link Table#load} instead.
*
* Note, if a table was recently created, inserts may fail until the table
* is consistent within BigQuery. If a `schema` is supplied, this method will
* automatically retry those failed inserts, and it will even create the
* table with the provided schema if it does not exist.
*
* @see [Tabledata: insertAll API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll}
* @see [Streaming Insert Limits]{@link https://cloud.google.com/bigquery/quotas#streaming_inserts}
* @see [Troubleshooting Errors]{@link https://developers.google.com/bigquery/troubleshooting-errors}
Expand All @@ -1743,12 +1755,15 @@ class Table extends common.ServiceObject {
* default row id when one is not provided.
* @param {boolean} [options.ignoreUnknownValues=false] Accept rows that contain
* values that do not match the schema. The unknown values are ignored.
* @param {number} [options.partialRetries=3] Number of times to retry
* inserting rows for cases of partial failures.
* @param {boolean} [options.raw] If `true`, the `rows` argument is expected to
* be formatted as according to the
* [specification](https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll).
* @param {string|object} [options.schema] If provided will atomatically create
* a table if it doesn't already exist. Note that this can take longer
* than 2 minutes to complete. A comma-separated list of name:type pairs.
* @param {string|object} [options.schema] If provided will automatically
* create a table if it doesn't already exist. Note that this can take
* longer than 2 minutes to complete. A comma-separated list of
* name:type pairs.
* Valid types are "string", "integer", "float", "boolean", and
* "timestamp". If the type is omitted, it is assumed to be "string".
* Example: "name:string, age:integer". Schemas can also be specified as a
Expand Down Expand Up @@ -1870,6 +1885,102 @@ class Table extends common.ServiceObject {
? optionsOrCallback
: (cb as InsertRowsCallback);

this._insertAndCreateTable(rows, options).then(
resp => callback(null, resp),
err => callback(err, null)
);
}

/**
* Insert rows with retries, but will create the table if not exists.
*
* @param {RowMetadata | RowMetadata[]} rows
* @param {InsertRowsOptions} options
* @returns {Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable>}
* @private
*/
private async _insertAndCreateTable(
rows: RowMetadata | RowMetadata[],
options: InsertRowsOptions
): Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable> {
const {schema} = options;
const delay = 60000;

try {
return await this._insertWithRetry(rows, options);
} catch (err) {
if ((err as common.ApiError).code !== 404 || !schema) {
throw err;
}
}

try {
await this.create({schema});
} catch (err) {
if ((err as common.ApiError).code !== 409) {
throw err;
}
}

// table creation after failed access is subject to failure caching and
// eventual consistency, see:
// https://github.com/googleapis/google-cloud-python/issues/4553#issuecomment-350110292
await new Promise(resolve => setTimeout(resolve, delay));
return this._insertAndCreateTable(rows, options);
}

/**
* This method will attempt to insert rows while retrying any partial failures
* that occur along the way. Because partial insert failures are returned
* differently, we can't depend on our usual retry strategy.
*
* @private
*
* @param {RowMetadata|RowMetadata[]} rows The rows to insert.
* @param {InsertRowsOptions} options Insert options.
* @returns {Promise<bigquery.ITableDataInsertAllResponse>}
*/
private async _insertWithRetry(
rows: RowMetadata | RowMetadata[],
options: InsertRowsOptions
): Promise<bigquery.ITableDataInsertAllResponse> {
const {partialRetries = 3} = options;
let error: Error;

const maxAttempts = Math.max(partialRetries, 0) + 1;

for (let attempts = 0; attempts < maxAttempts; attempts++) {
try {
return await this._insert(rows, options);
} catch (e) {
error = e;
rows = ((e.errors || []) as PartialInsertFailure[])
.filter(err => !!err.row)
.map(err => err.row);

if (!rows.length) {
break;
}
}
}

throw error!;
}

/**
* This method does the bulk of the work for processing options and making the
* network request.
*
* @private
*
* @param {RowMetadata|RowMetadata[]} rows The rows to insert.
* @param {InsertRowsOptions} options Insert options.
* @returns {Promise<bigquery.ITableDataInsertAllResponse>}
*/
private async _insert(
rows: RowMetadata | RowMetadata[],
options: InsertRowsOptions
): Promise<bigquery.ITableDataInsertAllResponse> {
rows = arrify(rows) as RowMetadata[];

if (!rows.length) {
Expand All @@ -1893,74 +2004,39 @@ class Table extends common.ServiceObject {
}

delete json.createInsertId;
delete json.partialRetries;
delete json.raw;
delete json.schema;

let schema: string | {};

if (options.schema) {
schema = options.schema;
delete json.schema;
}

const createTableAndRetry = () => {
this.create(
{
schema,
},
(err, table, resp) => {
if (err && err.code !== 409) {
callback!(err, resp);
return;
}

setTimeout(() => {
this.insert(rows, options, callback!);
}, 60000);
}
);
};

this.request(
{
method: 'POST',
uri: '/insertAll',
json,
},
(err, resp) => {
if (err) {
if ((err as common.ApiError).code === 404 && schema) {
setTimeout(createTableAndRetry, Math.random() * 60000);
} else {
callback!(err, resp);
}
return;
}
const [resp] = await this.request({
zamnuts marked this conversation as resolved.
Show resolved Hide resolved
method: 'POST',
uri: '/insertAll',
json,
});

const partialFailures = (resp.insertErrors || []).map(
(insertError: GoogleErrorBody) => {
const partialFailures = (resp.insertErrors || []).map(
(insertError: GoogleErrorBody) => {
return {
errors: insertError.errors!.map(error => {
return {
errors: insertError.errors!.map(error => {
return {
message: error.message,
reason: error.reason,
};
}),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
row: rows[(insertError as any).index],
message: error.message,
reason: error.reason,
};
}
);

if (partialFailures.length > 0) {
err = new common.util.PartialFailureError({
errors: partialFailures,
response: resp,
} as GoogleErrorBody);
}

callback!(err, resp);
}),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
row: rows[(insertError as any).index],
};
}
);

if (partialFailures.length > 0) {
throw new common.util.PartialFailureError({
errors: partialFailures,
response: resp,
} as GoogleErrorBody);
}

return resp;
}

load(
Expand Down
1 change: 0 additions & 1 deletion system-test/bigquery.ts
Expand Up @@ -975,7 +975,6 @@ describe('BigQuery', () => {
};

const options = {
autoCreate: true,
schema: SCHEMA,
};

Expand Down