diff --git a/package.json b/package.json index 5ebc5383..920cc228 100644 --- a/package.json +++ b/package.json @@ -85,6 +85,7 @@ "mocha": "^7.0.0", "mv": "^2.1.1", "ncp": "^2.0.0", + "p-reflect": "^2.1.0", "proxyquire": "^2.1.0", "sinon": "^9.0.0", "tmp": "0.1.0", diff --git a/src/table.ts b/src/table.ts index a5b8e0f5..cd6021b4 100644 --- a/src/table.ts +++ b/src/table.ts @@ -73,6 +73,7 @@ export type RowMetadata = any; export type InsertRowsOptions = bigquery.ITableDataInsertAllRequest & { createInsertId?: boolean; + partialRetries?: number; raw?: boolean; schema?: string | {}; }; @@ -136,6 +137,12 @@ export type FormattedMetadata = bigquery.ITable; export type TableSchema = bigquery.ITableSchema; export type TableField = bigquery.ITableFieldSchema; +export interface PartialInsertFailure { + message: string; + reason: string; + row: RowMetadata; +} + /** * The file formats accepted by BigQuery. * @@ -1733,6 +1740,11 @@ class Table extends common.ServiceObject { * If you need to create an entire table from a file, consider using * {@link Table#load} instead. * + * Note, if a table was recently created, inserts may fail until the table + * is consistent within BigQuery. If a `schema` is supplied, this method will + * automatically retry those failed inserts, and it will even create the + * table with the provided schema if it does not exist. + * * @see [Tabledata: insertAll API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll} * @see [Streaming Insert Limits]{@link https://cloud.google.com/bigquery/quotas#streaming_inserts} * @see [Troubleshooting Errors]{@link https://developers.google.com/bigquery/troubleshooting-errors} @@ -1743,12 +1755,15 @@ class Table extends common.ServiceObject { * default row id when one is not provided. * @param {boolean} [options.ignoreUnknownValues=false] Accept rows that contain * values that do not match the schema. The unknown values are ignored. + * @param {number} [options.partialRetries=3] Number of times to retry + * inserting rows for cases of partial failures. * @param {boolean} [options.raw] If `true`, the `rows` argument is expected to * be formatted as according to the * [specification](https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll). - * @param {string|object} [options.schema] If provided will atomatically create - * a table if it doesn't already exist. Note that this can take longer - * than 2 minutes to complete. A comma-separated list of name:type pairs. + * @param {string|object} [options.schema] If provided will automatically + * create a table if it doesn't already exist. Note that this can take + * longer than 2 minutes to complete. A comma-separated list of + * name:type pairs. * Valid types are "string", "integer", "float", "boolean", and * "timestamp". If the type is omitted, it is assumed to be "string". * Example: "name:string, age:integer". Schemas can also be specified as a @@ -1870,6 +1885,102 @@ class Table extends common.ServiceObject { ? optionsOrCallback : (cb as InsertRowsCallback); + this._insertAndCreateTable(rows, options).then( + resp => callback(null, resp), + err => callback(err, null) + ); + } + + /** + * Insert rows with retries, but will create the table if not exists. + * + * @param {RowMetadata | RowMetadata[]} rows + * @param {InsertRowsOptions} options + * @returns {Promise} + * @private + */ + private async _insertAndCreateTable( + rows: RowMetadata | RowMetadata[], + options: InsertRowsOptions + ): Promise { + const {schema} = options; + const delay = 60000; + + try { + return await this._insertWithRetry(rows, options); + } catch (err) { + if ((err as common.ApiError).code !== 404 || !schema) { + throw err; + } + } + + try { + await this.create({schema}); + } catch (err) { + if ((err as common.ApiError).code !== 409) { + throw err; + } + } + + // table creation after failed access is subject to failure caching and + // eventual consistency, see: + // https://github.com/googleapis/google-cloud-python/issues/4553#issuecomment-350110292 + await new Promise(resolve => setTimeout(resolve, delay)); + return this._insertAndCreateTable(rows, options); + } + + /** + * This method will attempt to insert rows while retrying any partial failures + * that occur along the way. Because partial insert failures are returned + * differently, we can't depend on our usual retry strategy. + * + * @private + * + * @param {RowMetadata|RowMetadata[]} rows The rows to insert. + * @param {InsertRowsOptions} options Insert options. + * @returns {Promise} + */ + private async _insertWithRetry( + rows: RowMetadata | RowMetadata[], + options: InsertRowsOptions + ): Promise { + const {partialRetries = 3} = options; + let error: Error; + + const maxAttempts = Math.max(partialRetries, 0) + 1; + + for (let attempts = 0; attempts < maxAttempts; attempts++) { + try { + return await this._insert(rows, options); + } catch (e) { + error = e; + rows = ((e.errors || []) as PartialInsertFailure[]) + .filter(err => !!err.row) + .map(err => err.row); + + if (!rows.length) { + break; + } + } + } + + throw error!; + } + + /** + * This method does the bulk of the work for processing options and making the + * network request. + * + * @private + * + * @param {RowMetadata|RowMetadata[]} rows The rows to insert. + * @param {InsertRowsOptions} options Insert options. + * @returns {Promise} + */ + private async _insert( + rows: RowMetadata | RowMetadata[], + options: InsertRowsOptions + ): Promise { rows = arrify(rows) as RowMetadata[]; if (!rows.length) { @@ -1893,74 +2004,39 @@ class Table extends common.ServiceObject { } delete json.createInsertId; + delete json.partialRetries; delete json.raw; + delete json.schema; - let schema: string | {}; - - if (options.schema) { - schema = options.schema; - delete json.schema; - } - - const createTableAndRetry = () => { - this.create( - { - schema, - }, - (err, table, resp) => { - if (err && err.code !== 409) { - callback!(err, resp); - return; - } - - setTimeout(() => { - this.insert(rows, options, callback!); - }, 60000); - } - ); - }; - - this.request( - { - method: 'POST', - uri: '/insertAll', - json, - }, - (err, resp) => { - if (err) { - if ((err as common.ApiError).code === 404 && schema) { - setTimeout(createTableAndRetry, Math.random() * 60000); - } else { - callback!(err, resp); - } - return; - } + const [resp] = await this.request({ + method: 'POST', + uri: '/insertAll', + json, + }); - const partialFailures = (resp.insertErrors || []).map( - (insertError: GoogleErrorBody) => { + const partialFailures = (resp.insertErrors || []).map( + (insertError: GoogleErrorBody) => { + return { + errors: insertError.errors!.map(error => { return { - errors: insertError.errors!.map(error => { - return { - message: error.message, - reason: error.reason, - }; - }), - // eslint-disable-next-line @typescript-eslint/no-explicit-any - row: rows[(insertError as any).index], + message: error.message, + reason: error.reason, }; - } - ); - - if (partialFailures.length > 0) { - err = new common.util.PartialFailureError({ - errors: partialFailures, - response: resp, - } as GoogleErrorBody); - } - - callback!(err, resp); + }), + // eslint-disable-next-line @typescript-eslint/no-explicit-any + row: rows[(insertError as any).index], + }; } ); + + if (partialFailures.length > 0) { + throw new common.util.PartialFailureError({ + errors: partialFailures, + response: resp, + } as GoogleErrorBody); + } + + return resp; } load( diff --git a/system-test/bigquery.ts b/system-test/bigquery.ts index d403d047..fe7007e5 100644 --- a/system-test/bigquery.ts +++ b/system-test/bigquery.ts @@ -975,7 +975,6 @@ describe('BigQuery', () => { }; const options = { - autoCreate: true, schema: SCHEMA, }; diff --git a/test/table.ts b/test/table.ts index 33a5b365..b60b5471 100644 --- a/test/table.ts +++ b/test/table.ts @@ -27,6 +27,7 @@ import {describe, it, afterEach, beforeEach, before, after} from 'mocha'; import Big from 'big.js'; import {EventEmitter} from 'events'; import * as extend from 'extend'; +import * as pReflect from 'p-reflect'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; import * as stream from 'stream'; @@ -36,10 +37,10 @@ import {BigQuery, Query} from '../src/bigquery'; import {Job, JobOptions} from '../src/job'; import { CopyTableMetadata, + InsertRowsResponse, JobLoadMetadata, // eslint-disable-next-line @typescript-eslint/no-unused-vars Table, - TableMetadata, ViewDefinition, } from '../src/table'; import bigquery from '../src/types'; @@ -2020,12 +2021,61 @@ describe('BigQuery/Table', () => { }), }; + const OPTIONS = { + schema: SCHEMA_STRING, + }; + + let clock: sinon.SinonFakeTimers; + let insertSpy: sinon.SinonSpy; + let requestStub: sinon.SinonStub; + + before(() => { + clock = sinon.useFakeTimers() as sinon.SinonFakeTimers; + }); + beforeEach(() => { + insertSpy = sinon.spy(table, '_insert'); + requestStub = sinon.stub(table, 'request').resolves([{}]); fakeUuid.v4 = () => { return fakeInsertId; }; }); + afterEach(() => { + clock.reset(); + insertSpy.restore(); + }); + + after(() => { + clock.restore(); + }); + + /** + * Only use this method when NOT directly awaiting on `table.insert`, i.e. + * when relying on any of the fake timer async helpers. + * Tests should assert isRejected or isFulfilled. + * @ignore + * @param fn + * @returns {Promise>} + */ + async function reflectAfterTimer( + fn: () => Promise + ): Promise> { + // When `fn` rejects/throws, we need to capture this and test + // for it as needed. Using reflection avoids try/catch's potential for + // false-positives. + // Also, defer capturing the settled promise until _after_ the + // internal timer (delay) has been completed. + + const fnPromise: Promise = fn(); + const reflectedPromise: Promise> = pReflect(fnPromise); + + await clock.runAllAsync(); + return reflectedPromise; + } + it('should throw an error if rows is empty', async () => { await assert.rejects( async () => table.insert([]), @@ -2033,42 +2083,41 @@ describe('BigQuery/Table', () => { ); }); - it('should save data', done => { - table.request = (reqOpts: DecorateRequestOptions) => { - assert.strictEqual(reqOpts.method, 'POST'); - assert.strictEqual(reqOpts.uri, '/insertAll'); - assert.deepStrictEqual(reqOpts.json, dataApiFormat); - done(); - }; - - table.insert(data, done); + it('should save data', async () => { + await table.insert(data); + assert( + requestStub.calledOnceWithExactly({ + method: 'POST', + uri: '/insertAll', + json: dataApiFormat, + }) + ); }); - it('should generate insertId', done => { - table.request = (reqOpts: DecorateRequestOptions) => { - assert.strictEqual(reqOpts.json.rows[0].insertId, fakeInsertId); - done(); - }; - - table.insert([data[0]], done); + it('should generate insertId', async () => { + await table.insert([data[0]]); + assert( + requestStub.calledOnceWith( + sinon.match.hasNested('json.rows[0].insertId', fakeInsertId) + ) + ); }); - it('should omit the insertId if createInsertId is false', done => { - table.request = ({json}: DecorateRequestOptions) => { - assert.strictEqual(json.rows[0].insertId, undefined); - assert.strictEqual(json.createInsertId, undefined); - done(); - }; - - table.insert([data[0]], {createInsertId: false}, done); + it('should omit the insertId if createInsertId is false', async () => { + await table.insert([data[0]], {createInsertId: false}); + assert(requestStub.calledOnce); + assert( + requestStub.calledWithMatch( + ({json}: DecorateRequestOptions) => + json.rows[0].insertId === undefined && + json.createInsertId === undefined + ) + ); }); it('should execute callback with API response', done => { const apiResponse = {insertErrors: []}; - - table.request = (reqOpts: DecorateRequestOptions, callback: Function) => { - callback(null, apiResponse); - }; + requestStub.resolves([apiResponse]); table.insert(data, (err: Error, apiResponse_: {}) => { assert.ifError(err); @@ -2079,215 +2128,351 @@ describe('BigQuery/Table', () => { it('should execute callback with error & API response', done => { const error = new Error('Error.'); - const apiResponse = {}; - - table.request = (reqOpts: DecorateRequestOptions, callback: Function) => { - callback(error, apiResponse); - }; + requestStub.rejects(error); table.insert(data, (err: Error, apiResponse_: {}) => { assert.strictEqual(err, error); - assert.strictEqual(apiResponse_, apiResponse); + assert.strictEqual(apiResponse_, null); done(); }); }); - it('should return partial failures', done => { + it('should reject with API error', async () => { + const error = new Error('Error.'); + requestStub.rejects(error); + await assert.rejects(async () => table.insert(data), error); + }); + + it('should return partial failures', async () => { const row0Error = {message: 'Error.', reason: 'notFound'}; const row1Error = {message: 'Error.', reason: 'notFound'}; - - table.request = (reqOpts: DecorateRequestOptions, callback: Function) => { - callback(null, { + requestStub.resolves([ + { insertErrors: [ {index: 0, errors: [row0Error]}, {index: 1, errors: [row1Error]}, ], - }); - }; + }, + ]); - table.insert(data, (err: Error) => { - assert.strictEqual(err.name, 'PartialFailureError'); + const reflection = await reflectAfterTimer(() => table.insert(data)); + assert(reflection.isRejected); + const {reason} = reflection as pReflect.PromiseRejectedResult; + assert.deepStrictEqual((reason as GoogleErrorBody).errors, [ + { + row: dataApiFormat.rows[0].json, + errors: [row0Error], + }, + { + row: dataApiFormat.rows[1].json, + errors: [row1Error], + }, + ]); + }); - assert.deepStrictEqual(((err as {}) as GoogleErrorBody).errors, [ - { - row: dataApiFormat.rows[0].json, - errors: [row0Error], - }, - { - row: dataApiFormat.rows[1].json, - errors: [row1Error], - }, - ]); + it('should retry partials default max 3', async () => { + const rowError = {message: 'Error.', reason: 'try again plz'}; + requestStub.resetBehavior(); + requestStub.resolves([ + { + insertErrors: [ + {index: 0, errors: [rowError]}, + {index: 1, errors: [rowError]}, + {index: 2, errors: [rowError]}, + {index: 3, errors: [rowError]}, + ], + }, + ]); - done(); - }); + const reflection = await reflectAfterTimer(() => + table.insert(data, OPTIONS) + ); + assert(reflection.isRejected); + assert.strictEqual(insertSpy.callCount, 4); }); - it('should insert raw data', done => { - table.request = (reqOpts: DecorateRequestOptions) => { - assert.strictEqual(reqOpts.method, 'POST'); - assert.strictEqual(reqOpts.uri, '/insertAll'); - assert.deepStrictEqual(reqOpts.json, {rows: rawData}); - assert.strictEqual(reqOpts.json.raw, undefined); - done(); - }; + it('should retry partials with optional max', async () => { + const partialRetries = 6; + const rowError = {message: 'Error.', reason: 'try again plz'}; + requestStub.resetBehavior(); + requestStub.resolves([ + { + insertErrors: [ + {index: 0, errors: [rowError]}, + {index: 1, errors: [rowError]}, + {index: 2, errors: [rowError]}, + {index: 3, errors: [rowError]}, + ], + }, + ]); + + const reflection = await reflectAfterTimer(() => + table.insert(data, {...OPTIONS, partialRetries}) + ); + assert(reflection.isRejected); + assert.strictEqual(insertSpy.callCount, partialRetries + 1); + }); + + it('should allow 0 partial retries, but still do it once', async () => { + const rowError = {message: 'Error.', reason: 'try again plz'}; + requestStub.resetBehavior(); + requestStub.resolves([ + { + insertErrors: [ + {index: 0, errors: [rowError]}, + {index: 1, errors: [rowError]}, + {index: 2, errors: [rowError]}, + {index: 3, errors: [rowError]}, + ], + }, + ]); + + const reflection = await reflectAfterTimer(() => + table.insert(data, {...OPTIONS, partialRetries: 0}) + ); + assert(reflection.isRejected); + assert.strictEqual(insertSpy.callCount, 1); + }); + + it('should keep partial retries option non-negative', async () => { + const rowError = {message: 'Error.', reason: 'try again plz'}; + requestStub.resetBehavior(); + requestStub.resolves([ + { + insertErrors: [ + {index: 0, errors: [rowError]}, + {index: 1, errors: [rowError]}, + {index: 2, errors: [rowError]}, + {index: 3, errors: [rowError]}, + ], + }, + ]); + + const reflection = await reflectAfterTimer(() => + table.insert(data, {...OPTIONS, partialRetries: -1}) + ); + assert(reflection.isRejected); + assert.strictEqual(insertSpy.callCount, 1); + }); + it('should retry partial inserts deltas', async () => { + const rowError = {message: 'Error.', reason: 'try again plz'}; + requestStub.resetBehavior(); + requestStub.onCall(0).resolves([ + { + insertErrors: [ + {index: 0, errors: [rowError]}, + {index: 1, errors: [rowError]}, + {index: 2, errors: [rowError]}, + {index: 3, errors: [rowError]}, + ], + }, + ]); + + requestStub.onCall(1).resolves([ + { + insertErrors: [ + {index: 0, errors: [rowError]}, + {index: 1, errors: [rowError]}, + {index: 2, errors: [rowError]}, + ], + }, + ]); + + requestStub.onCall(2).resolves([ + { + insertErrors: [ + {index: 1, errors: [rowError]}, + {index: 2, errors: [rowError]}, + ], + }, + ]); + + const goodResponse = [{foo: 'bar'}]; + requestStub.onCall(3).resolves(goodResponse); + + const reflection = await reflectAfterTimer(() => + table.insert(data, OPTIONS) + ); + assert(reflection.isFulfilled); + + assert.deepStrictEqual( + requestStub.getCall(0).args[0].json, + dataApiFormat, + 'first call: try all 5' + ); + assert.deepStrictEqual( + requestStub.getCall(1).args[0].json, + {rows: dataApiFormat.rows.slice(0, 4)}, + 'second call: previous failures were 4/5' + ); + assert.deepStrictEqual( + requestStub.getCall(2).args[0].json, + {rows: dataApiFormat.rows.slice(0, 3)}, + 'third call: previous failures were 3/5' + ); + assert.deepStrictEqual( + requestStub.getCall(3).args[0].json, + {rows: dataApiFormat.rows.slice(1, 3)}, + 'fourth call: previous failures were 2/5' + ); + assert(!requestStub.getCall(4), 'fifth call: should not have happened'); + + const {value} = reflection as pReflect.PromiseFulfilledResult< + InsertRowsResponse + >; + assert(value); + }); + + it('should insert raw data', async () => { const opts = {raw: true}; - table.insert(rawData, opts, done); + await table.insert(rawData, opts); + assert(requestStub.calledOnce); + + const [reqOpts]: DecorateRequestOptions[] = requestStub.firstCall.args; + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.uri, '/insertAll'); + assert.deepStrictEqual(reqOpts.json, {rows: rawData}); + assert.strictEqual(reqOpts.json.raw, undefined); }); - it('should accept options', done => { + it('should accept options', async () => { const opts = { ignoreUnknownValues: true, skipInvalidRows: true, templateSuffix: 'test', }; - table.request = (reqOpts: DecorateRequestOptions) => { - assert.strictEqual(reqOpts.method, 'POST'); - assert.strictEqual(reqOpts.uri, '/insertAll'); + await table.insert(data, opts); + assert(requestStub.calledOnce); - assert.strictEqual( - reqOpts.json.ignoreUnknownValues, - opts.ignoreUnknownValues - ); - assert.strictEqual(reqOpts.json.skipInvalidRows, opts.skipInvalidRows); - assert.strictEqual(reqOpts.json.templateSuffix, opts.templateSuffix); + const [reqOpts]: DecorateRequestOptions[] = requestStub.firstCall.args; + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.uri, '/insertAll'); - assert.deepStrictEqual(reqOpts.json.rows, dataApiFormat.rows); - done(); - }; + assert.strictEqual( + reqOpts.json.ignoreUnknownValues, + opts.ignoreUnknownValues + ); + assert.strictEqual(reqOpts.json.skipInvalidRows, opts.skipInvalidRows); + assert.strictEqual(reqOpts.json.templateSuffix, opts.templateSuffix); - table.insert(data, opts, done); + assert.deepStrictEqual(reqOpts.json.rows, dataApiFormat.rows); }); describe('create table and retry', () => { - const OPTIONS = { - schema: SCHEMA_STRING, - }; - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let _setTimeout: any; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let _random: any; - - before(() => { - _setTimeout = global.setTimeout; - _random = Math.random; - }); + let createStub: sinon.SinonStub; + let insertCreateSpy: sinon.SinonSpy; beforeEach(() => { - sandbox.stub(global, 'setTimeout').callsFake(cb => { - cb(); - return {} as NodeJS.Timeout; - }); - Math.random = _random; - table.request = ( - reqOpts: DecorateRequestOptions, - callback: Function - ) => { - callback({code: 404}); - }; - table.create = (reqOpts: TableMetadata, callback: Function) => { - callback(null); - }; + insertCreateSpy = sinon.spy(table, '_insertAndCreateTable'); + createStub = sinon.stub(table, 'create').resolves([{}]); + requestStub.onFirstCall().rejects({code: 404}); }); - after(() => { - global.setTimeout = _setTimeout; - Math.random = _random; + afterEach(() => { + insertCreateSpy.restore(); + createStub.restore(); }); - it('should not include the schema in the insert request', done => { - table.request = (reqOpts: DecorateRequestOptions) => { - assert.strictEqual(reqOpts.json.schema, undefined); - done(); - }; + it('should not include the schema in the insert request', async () => { + requestStub.reset(); + requestStub.resolves([{}]); - table.insert(data, OPTIONS, assert.ifError); + await table.insert(data, OPTIONS); + assert(requestStub.calledOnce); + assert.strictEqual( + requestStub.firstCall.lastArg.json.schema, + undefined + ); }); - it('should set a timeout to create the table', done => { - const fakeRandomValue = Math.random(); + it('should attempt to create table if not created', async () => { + const reflection = await reflectAfterTimer(() => + table.insert(data, OPTIONS) + ); + assert(reflection.isFulfilled); + assert(createStub.calledOnce); + assert.strictEqual(createStub.firstCall.lastArg.schema, SCHEMA_STRING); + }); - Math.random = () => { - return fakeRandomValue; - }; + it('should set a timeout to insert rows in the created table', async () => { + // the implementation uses an explicit 60s delay + // so this tests at various intervals + const expectedDelay = 60000; + const firstCheckDelay = 50000; + const remainingCheckDelay = expectedDelay - firstCheckDelay; - sandbox.restore(); - sandbox.stub(global, 'setTimeout').callsFake((callback, delay) => { - assert.strictEqual(delay, fakeRandomValue * 60000); - callback(); - return {} as NodeJS.Timeout; - }); + pReflect(table.insert(data, OPTIONS)); // gracefully handle async errors + assert(insertCreateSpy.calledOnce); // just called `insert`, that's 1 so far - table.create = (reqOpts: TableMetadata) => { - assert.strictEqual(reqOpts.schema, SCHEMA_STRING); - done(); - }; + await clock.tickAsync(firstCheckDelay); // first 50s + assert(insertCreateSpy.calledOnce); + assert(createStub.calledOnce, 'must create table before inserting'); + + await clock.tickAsync(remainingCheckDelay); // first 50s + 10s = 60s + assert(insertCreateSpy.calledTwice); + assert.strictEqual(insertCreateSpy.secondCall.args[0], data); + assert.strictEqual(insertCreateSpy.secondCall.args[1], OPTIONS); - table.insert(data, OPTIONS, assert.ifError); + await clock.runAllAsync(); // for good measure + assert( + insertCreateSpy.calledTwice, + 'should not have called insert again' + ); }); - it('should return table creation errors', done => { + it('should reject on table creation errors', async () => { const error = new Error('err.'); - const response = {}; + createStub.rejects(error); - table.create = (reqOpts: TableMetadata, callback: Function) => { - callback(error, null, response); - }; + const reflection = await reflectAfterTimer(() => + table.insert(data, OPTIONS) + ); + assert(reflection.isRejected); - table.insert(data, OPTIONS, (err: Error, resp: {}) => { - assert.strictEqual(err, error); - assert.strictEqual(resp, response); - done(); - }); + const {reason} = reflection as pReflect.PromiseRejectedResult; + assert.strictEqual(reason, error); }); - it('should ignore 409 errors', done => { - table.create = (reqOpts: TableMetadata, callback: Function) => { - callback({code: 409}); - }; + it('should ignore 409 errors', async () => { + createStub.rejects({code: 409}); - let timeouts = 0; - sandbox.restore(); - sandbox.stub(global, 'setTimeout').callsFake((callback, delay) => { - if (++timeouts === 2) { - assert.strictEqual(delay, 60000); - done(); - } - callback(null); - return {} as NodeJS.Timeout; - }); - - table.insert(data, OPTIONS, assert.ifError); + const reflection = await reflectAfterTimer(() => + table.insert(data, OPTIONS) + ); + assert(reflection.isFulfilled); + assert(createStub.calledOnce); + assert(insertCreateSpy.calledTwice); + assert.strictEqual(insertCreateSpy.secondCall.args[0], data); + assert.strictEqual(insertCreateSpy.secondCall.args[1], OPTIONS); }); - it('should retry the insert', done => { - const response = {}; - let attempts = 0; + it('should retry the insert', async () => { + const errorResponse = {code: 404}; + requestStub.onFirstCall().rejects(errorResponse); + requestStub.onSecondCall().rejects(errorResponse); - table.request = ( - reqOpts: DecorateRequestOptions, - callback: Function - ) => { - assert.strictEqual(reqOpts.method, 'POST'); - assert.strictEqual(reqOpts.uri, '/insertAll'); - assert.deepStrictEqual(reqOpts.json, dataApiFormat); - - if (++attempts === 2) { - callback(null, response); - return; - } + const goodResponse = [{foo: 'bar'}]; + requestStub.onThirdCall().resolves(goodResponse); - callback({code: 404}); - }; + const reflection = await reflectAfterTimer(() => + table.insert(data, OPTIONS) + ); + assert(reflection.isFulfilled); + assert(requestStub.calledThrice); + assert( + requestStub.alwaysCalledWithMatch({ + method: 'POST', + uri: '/insertAll', + json: dataApiFormat, + }) + ); - table.insert(data, OPTIONS, (err: Error, resp: {}) => { - assert.ifError(err); - assert.strictEqual(resp, response); - done(); - }); + const {value} = reflection as pReflect.PromiseFulfilledResult< + InsertRowsResponse + >; + assert.deepStrictEqual(value, goodResponse); }); }); });