Skip to content

Commit

Permalink
fix(table): add retries for insert partial failures (#589)
Browse files Browse the repository at this point in the history
* fix(table): add retries for insert partial failures

* refactoring

* moar refactors

* fix(table): working test cases for insert partial failure and retries (#589)

* chore(table): explicit insert/create recursion, elaborated on method signatures/types

* chore(table): rename insert option maxAttempts to partialRetries, allow retry of 0, do not allow negative retries #589

* test(table): remove sinon fake timers hack now that v9 is available #655

Co-authored-by: Andrew Zammit <zammit.andrew@gmail.com>
  • Loading branch information
callmehiphop and zamnuts committed Apr 13, 2020
1 parent 7984e0b commit b8639c2
Show file tree
Hide file tree
Showing 4 changed files with 504 additions and 243 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -85,6 +85,7 @@
"mocha": "^7.0.0",
"mv": "^2.1.1",
"ncp": "^2.0.0",
"p-reflect": "^2.1.0",
"proxyquire": "^2.1.0",
"sinon": "^9.0.0",
"tmp": "0.1.0",
Expand Down
204 changes: 140 additions & 64 deletions src/table.ts
Expand Up @@ -73,6 +73,7 @@ export type RowMetadata = any;

export type InsertRowsOptions = bigquery.ITableDataInsertAllRequest & {
createInsertId?: boolean;
partialRetries?: number;
raw?: boolean;
schema?: string | {};
};
Expand Down Expand Up @@ -136,6 +137,12 @@ export type FormattedMetadata = bigquery.ITable;
export type TableSchema = bigquery.ITableSchema;
export type TableField = bigquery.ITableFieldSchema;

export interface PartialInsertFailure {
message: string;
reason: string;
row: RowMetadata;
}

/**
* The file formats accepted by BigQuery.
*
Expand Down Expand Up @@ -1733,6 +1740,11 @@ class Table extends common.ServiceObject {
* If you need to create an entire table from a file, consider using
* {@link Table#load} instead.
*
* Note, if a table was recently created, inserts may fail until the table
* is consistent within BigQuery. If a `schema` is supplied, this method will
* automatically retry those failed inserts, and it will even create the
* table with the provided schema if it does not exist.
*
* @see [Tabledata: insertAll API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll}
* @see [Streaming Insert Limits]{@link https://cloud.google.com/bigquery/quotas#streaming_inserts}
* @see [Troubleshooting Errors]{@link https://developers.google.com/bigquery/troubleshooting-errors}
Expand All @@ -1743,12 +1755,15 @@ class Table extends common.ServiceObject {
* default row id when one is not provided.
* @param {boolean} [options.ignoreUnknownValues=false] Accept rows that contain
* values that do not match the schema. The unknown values are ignored.
* @param {number} [options.partialRetries=3] Number of times to retry
* inserting rows for cases of partial failures.
* @param {boolean} [options.raw] If `true`, the `rows` argument is expected to
* be formatted as according to the
* [specification](https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll).
* @param {string|object} [options.schema] If provided will atomatically create
* a table if it doesn't already exist. Note that this can take longer
* than 2 minutes to complete. A comma-separated list of name:type pairs.
* @param {string|object} [options.schema] If provided will automatically
* create a table if it doesn't already exist. Note that this can take
* longer than 2 minutes to complete. A comma-separated list of
* name:type pairs.
* Valid types are "string", "integer", "float", "boolean", and
* "timestamp". If the type is omitted, it is assumed to be "string".
* Example: "name:string, age:integer". Schemas can also be specified as a
Expand Down Expand Up @@ -1870,6 +1885,102 @@ class Table extends common.ServiceObject {
? optionsOrCallback
: (cb as InsertRowsCallback);

this._insertAndCreateTable(rows, options).then(
resp => callback(null, resp),
err => callback(err, null)
);
}

/**
* Insert rows with retries, but will create the table if not exists.
*
* @param {RowMetadata | RowMetadata[]} rows
* @param {InsertRowsOptions} options
* @returns {Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable>}
* @private
*/
private async _insertAndCreateTable(
rows: RowMetadata | RowMetadata[],
options: InsertRowsOptions
): Promise<bigquery.ITableDataInsertAllResponse | bigquery.ITable> {
const {schema} = options;
const delay = 60000;

try {
return await this._insertWithRetry(rows, options);
} catch (err) {
if ((err as common.ApiError).code !== 404 || !schema) {
throw err;
}
}

try {
await this.create({schema});
} catch (err) {
if ((err as common.ApiError).code !== 409) {
throw err;
}
}

// table creation after failed access is subject to failure caching and
// eventual consistency, see:
// https://github.com/googleapis/google-cloud-python/issues/4553#issuecomment-350110292
await new Promise(resolve => setTimeout(resolve, delay));
return this._insertAndCreateTable(rows, options);
}

/**
* This method will attempt to insert rows while retrying any partial failures
* that occur along the way. Because partial insert failures are returned
* differently, we can't depend on our usual retry strategy.
*
* @private
*
* @param {RowMetadata|RowMetadata[]} rows The rows to insert.
* @param {InsertRowsOptions} options Insert options.
* @returns {Promise<bigquery.ITableDataInsertAllResponse>}
*/
private async _insertWithRetry(
rows: RowMetadata | RowMetadata[],
options: InsertRowsOptions
): Promise<bigquery.ITableDataInsertAllResponse> {
const {partialRetries = 3} = options;
let error: Error;

const maxAttempts = Math.max(partialRetries, 0) + 1;

for (let attempts = 0; attempts < maxAttempts; attempts++) {
try {
return await this._insert(rows, options);
} catch (e) {
error = e;
rows = ((e.errors || []) as PartialInsertFailure[])
.filter(err => !!err.row)
.map(err => err.row);

if (!rows.length) {
break;
}
}
}

throw error!;
}

/**
* This method does the bulk of the work for processing options and making the
* network request.
*
* @private
*
* @param {RowMetadata|RowMetadata[]} rows The rows to insert.
* @param {InsertRowsOptions} options Insert options.
* @returns {Promise<bigquery.ITableDataInsertAllResponse>}
*/
private async _insert(
rows: RowMetadata | RowMetadata[],
options: InsertRowsOptions
): Promise<bigquery.ITableDataInsertAllResponse> {
rows = arrify(rows) as RowMetadata[];

if (!rows.length) {
Expand All @@ -1893,74 +2004,39 @@ class Table extends common.ServiceObject {
}

delete json.createInsertId;
delete json.partialRetries;
delete json.raw;
delete json.schema;

let schema: string | {};

if (options.schema) {
schema = options.schema;
delete json.schema;
}

const createTableAndRetry = () => {
this.create(
{
schema,
},
(err, table, resp) => {
if (err && err.code !== 409) {
callback!(err, resp);
return;
}

setTimeout(() => {
this.insert(rows, options, callback!);
}, 60000);
}
);
};

this.request(
{
method: 'POST',
uri: '/insertAll',
json,
},
(err, resp) => {
if (err) {
if ((err as common.ApiError).code === 404 && schema) {
setTimeout(createTableAndRetry, Math.random() * 60000);
} else {
callback!(err, resp);
}
return;
}
const [resp] = await this.request({
method: 'POST',
uri: '/insertAll',
json,
});

const partialFailures = (resp.insertErrors || []).map(
(insertError: GoogleErrorBody) => {
const partialFailures = (resp.insertErrors || []).map(
(insertError: GoogleErrorBody) => {
return {
errors: insertError.errors!.map(error => {
return {
errors: insertError.errors!.map(error => {
return {
message: error.message,
reason: error.reason,
};
}),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
row: rows[(insertError as any).index],
message: error.message,
reason: error.reason,
};
}
);

if (partialFailures.length > 0) {
err = new common.util.PartialFailureError({
errors: partialFailures,
response: resp,
} as GoogleErrorBody);
}

callback!(err, resp);
}),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
row: rows[(insertError as any).index],
};
}
);

if (partialFailures.length > 0) {
throw new common.util.PartialFailureError({
errors: partialFailures,
response: resp,
} as GoogleErrorBody);
}

return resp;
}

load(
Expand Down
1 change: 0 additions & 1 deletion system-test/bigquery.ts
Expand Up @@ -975,7 +975,6 @@ describe('BigQuery', () => {
};

const options = {
autoCreate: true,
schema: SCHEMA,
};

Expand Down

0 comments on commit b8639c2

Please sign in to comment.