Skip to content

Commit

Permalink
chore(table): rename insert option maxAttempts to partialRetries, all…
Browse files Browse the repository at this point in the history
…ow retry of 0, do not allow negative retries #589
  • Loading branch information
zamnuts committed Apr 6, 2020
1 parent d10d6a4 commit 8496fb1
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 23 deletions.
16 changes: 10 additions & 6 deletions src/table.ts
Expand Up @@ -71,7 +71,7 @@ export type RowMetadata = any;

export type InsertRowsOptions = bigquery.ITableDataInsertAllRequest & {
createInsertId?: boolean;
maxAttempts?: number;
partialRetries?: number;
raw?: boolean;
schema?: string | {};
};
Expand Down Expand Up @@ -1757,8 +1757,8 @@ class Table extends common.ServiceObject {
* default row id when one is not provided.
* @param {boolean} [options.ignoreUnknownValues=false] Accept rows that contain
* values that do not match the schema. The unknown values are ignored.
* @param {number} [options.maxAttempts=3] Number of attempts to try and
* insert rows.
* @param {number} [options.partialRetries=3] Number of times to retry
* inserting rows for cases of partial failures.
* @param {boolean} [options.raw] If `true`, the `rows` argument is expected to
* be formatted as according to the
* [specification](https://cloud.google.com/bigquery/docs/reference/v2/tabledata/insertAll).
Expand Down Expand Up @@ -1946,9 +1946,11 @@ class Table extends common.ServiceObject {
rows: RowMetadata | RowMetadata[],
options: InsertRowsOptions
): Promise<bigquery.ITableDataInsertAllResponse> {
const maxAttempts = options.maxAttempts || 3;
const {partialRetries = 3} = options;
let error: Error;

const maxAttempts = Math.max(partialRetries, 0) + 1;

for (let attempts = 0; attempts < maxAttempts; attempts++) {
try {
return await this._insert(rows, options);
Expand All @@ -1958,7 +1960,9 @@ class Table extends common.ServiceObject {
.filter(err => !!err.row)
.map(err => err.row);

if (!rows.length) break;
if (!rows.length) {
break;
}
}
}

Expand Down Expand Up @@ -2002,7 +2006,7 @@ class Table extends common.ServiceObject {
}

delete json.createInsertId;
delete json.maxAttempts;
delete json.partialRetries;
delete json.raw;
delete json.schema;

Expand Down
194 changes: 177 additions & 17 deletions test/table.ts
Expand Up @@ -2022,20 +2022,26 @@ describe('BigQuery/Table', () => {
}),
};

const OPTIONS = {
schema: SCHEMA_STRING,
};

// HACK @types/sinon is missing the timer async methods
interface SinonFakeTimersShim extends sinon.SinonFakeTimers {
runAllAsync(): Promise<number>;
tickAsync(ms: number | string): Promise<number>;
}

let clock: SinonFakeTimersShim;
let insertSpy: sinon.SinonSpy;
let requestStub: sinon.SinonStub;

before(() => {
clock = sinon.useFakeTimers() as SinonFakeTimersShim;
});

beforeEach(() => {
insertSpy = sinon.spy(table, '_insert');
requestStub = sinon.stub(table, 'request').resolves([{}]);
fakeUuid.v4 = () => {
return fakeInsertId;
Expand All @@ -2044,6 +2050,7 @@ describe('BigQuery/Table', () => {

afterEach(() => {
clock.reset();
insertSpy.restore();
});

after(() => {
Expand Down Expand Up @@ -2110,7 +2117,7 @@ describe('BigQuery/Table', () => {
requestStub.calledWithMatch(
({json}: DecorateRequestOptions) =>
json.rows[0].insertId === undefined &&
json.createdInsertId === undefined
json.createInsertId === undefined
)
);
});
Expand Down Expand Up @@ -2170,6 +2177,160 @@ describe('BigQuery/Table', () => {
]);
});

it('should retry partials default max 3', async () => {
const rowError = {message: 'Error.', reason: 'try again plz'};
requestStub.resetBehavior();
requestStub.resolves([
{
insertErrors: [
{index: 0, errors: [rowError]},
{index: 1, errors: [rowError]},
{index: 2, errors: [rowError]},
{index: 3, errors: [rowError]},
],
},
]);

const reflection = await reflectAfterTimer(() =>
table.insert(data, OPTIONS)
);
assert(reflection.isRejected);
assert.strictEqual(insertSpy.callCount, 4);
});

it('should retry partials with optional max', async () => {
const partialRetries = 6;
const rowError = {message: 'Error.', reason: 'try again plz'};
requestStub.resetBehavior();
requestStub.resolves([
{
insertErrors: [
{index: 0, errors: [rowError]},
{index: 1, errors: [rowError]},
{index: 2, errors: [rowError]},
{index: 3, errors: [rowError]},
],
},
]);

const reflection = await reflectAfterTimer(() =>
table.insert(data, {...OPTIONS, partialRetries})
);
assert(reflection.isRejected);
assert.strictEqual(insertSpy.callCount, partialRetries + 1);
});

it('should allow 0 partial retries, but still do it once', async () => {
const rowError = {message: 'Error.', reason: 'try again plz'};
requestStub.resetBehavior();
requestStub.resolves([
{
insertErrors: [
{index: 0, errors: [rowError]},
{index: 1, errors: [rowError]},
{index: 2, errors: [rowError]},
{index: 3, errors: [rowError]},
],
},
]);

const reflection = await reflectAfterTimer(() =>
table.insert(data, {...OPTIONS, partialRetries: 0})
);
assert(reflection.isRejected);
assert.strictEqual(insertSpy.callCount, 1);
});

it('should keep partial retries option non-negative', async () => {
const rowError = {message: 'Error.', reason: 'try again plz'};
requestStub.resetBehavior();
requestStub.resolves([
{
insertErrors: [
{index: 0, errors: [rowError]},
{index: 1, errors: [rowError]},
{index: 2, errors: [rowError]},
{index: 3, errors: [rowError]},
],
},
]);

const reflection = await reflectAfterTimer(() =>
table.insert(data, {...OPTIONS, partialRetries: -1})
);
assert(reflection.isRejected);
assert.strictEqual(insertSpy.callCount, 1);
});

it('should retry partial inserts deltas', async () => {
const rowError = {message: 'Error.', reason: 'try again plz'};
requestStub.resetBehavior();
requestStub.onCall(0).resolves([
{
insertErrors: [
{index: 0, errors: [rowError]},
{index: 1, errors: [rowError]},
{index: 2, errors: [rowError]},
{index: 3, errors: [rowError]},
],
},
]);

requestStub.onCall(1).resolves([
{
insertErrors: [
{index: 0, errors: [rowError]},
{index: 1, errors: [rowError]},
{index: 2, errors: [rowError]},
],
},
]);

requestStub.onCall(2).resolves([
{
insertErrors: [
{index: 1, errors: [rowError]},
{index: 2, errors: [rowError]},
],
},
]);

const goodResponse = [{foo: 'bar'}];
requestStub.onCall(3).resolves(goodResponse);

const reflection = await reflectAfterTimer(() =>
table.insert(data, OPTIONS)
);
assert(reflection.isFulfilled);

assert.deepStrictEqual(
requestStub.getCall(0).args[0].json,
dataApiFormat,
'first call: try all 5'
);
assert.deepStrictEqual(
requestStub.getCall(1).args[0].json,
{rows: dataApiFormat.rows.slice(0, 4)},
'second call: previous failures were 4/5'
);
assert.deepStrictEqual(
requestStub.getCall(2).args[0].json,
{rows: dataApiFormat.rows.slice(0, 3)},
'third call: previous failures were 3/5'
);
assert.deepStrictEqual(
requestStub.getCall(3).args[0].json,
{rows: dataApiFormat.rows.slice(1, 3)},
'fourth call: previous failures were 2/5'
);
assert(!requestStub.getCall(4), 'fifth call: should not have happened');

const {value} = reflection as pReflect.PromiseFulfilledResult<
InsertRowsResponse
>;
assert(value);
});

it('should insert raw data', async () => {
const opts = {raw: true};
await table.insert(rawData, opts);
Expand Down Expand Up @@ -2207,21 +2368,17 @@ describe('BigQuery/Table', () => {
});

describe('create table and retry', () => {
const OPTIONS = {
schema: SCHEMA_STRING,
};

let createStub: sinon.SinonStub;
let insertSpy: sinon.SinonSpy;
let insertCreateSpy: sinon.SinonSpy;

beforeEach(() => {
insertSpy = sinon.spy(table, '_insertAndCreateTable');
insertCreateSpy = sinon.spy(table, '_insertAndCreateTable');
createStub = sinon.stub(table, 'create').resolves([{}]);
requestStub.onFirstCall().rejects({code: 404});
});

afterEach(() => {
insertSpy.restore();
insertCreateSpy.restore();
createStub.restore();
});

Expand Down Expand Up @@ -2254,19 +2411,22 @@ describe('BigQuery/Table', () => {
const remainingCheckDelay = expectedDelay - firstCheckDelay;

pReflect(table.insert(data, OPTIONS)); // gracefully handle async errors
assert(insertSpy.calledOnce); // just called `insert`, that's 1 so far
assert(insertCreateSpy.calledOnce); // just called `insert`, that's 1 so far

await clock.tickAsync(firstCheckDelay); // first 50s
assert(insertSpy.calledOnce);
assert(insertCreateSpy.calledOnce);
assert(createStub.calledOnce, 'must create table before inserting');

await clock.tickAsync(remainingCheckDelay); // first 50s + 10s = 60s
assert(insertSpy.calledTwice);
assert.strictEqual(insertSpy.secondCall.args[0], data);
assert.strictEqual(insertSpy.secondCall.args[1], OPTIONS);
assert(insertCreateSpy.calledTwice);
assert.strictEqual(insertCreateSpy.secondCall.args[0], data);
assert.strictEqual(insertCreateSpy.secondCall.args[1], OPTIONS);

await clock.runAllAsync(); // for good measure
assert(insertSpy.calledTwice, 'should not have called insert again');
assert(
insertCreateSpy.calledTwice,
'should not have called insert again'
);
});

it('should reject on table creation errors', async () => {
Expand All @@ -2290,9 +2450,9 @@ describe('BigQuery/Table', () => {
);
assert(reflection.isFulfilled);
assert(createStub.calledOnce);
assert(insertSpy.calledTwice);
assert.strictEqual(insertSpy.secondCall.args[0], data);
assert.strictEqual(insertSpy.secondCall.args[1], OPTIONS);
assert(insertCreateSpy.calledTwice);
assert.strictEqual(insertCreateSpy.secondCall.args[0], data);
assert.strictEqual(insertCreateSpy.secondCall.args[1], OPTIONS);
});

it('should retry the insert', async () => {
Expand Down

0 comments on commit 8496fb1

Please sign in to comment.