Skip to content

Commit

Permalink
[6.8] [kbn/es] auto retry snapshot download on unexpected errors (#38944
Browse files Browse the repository at this point in the history
) (#38994)

* [kbn/es] auto retry snapshot download on unexpected errors

* missed an await

* pass log to retry()
  • Loading branch information
Spencer committed Jun 14, 2019
1 parent 5caf5da commit 6d53abe
Showing 1 changed file with 52 additions and 26 deletions.
78 changes: 52 additions & 26 deletions packages/kbn-es/src/artifact.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const asyncPipeline = promisify(pipeline);
const V1_VERSIONS_API = 'https://artifacts-api.elastic.co/v1/versions';

const { cache } = require('./utils');
const { createCliError } = require('./errors');
const { createCliError, isCliError } = require('./errors');

const TEST_ES_SNAPSHOT_VERSION = process.env.TEST_ES_SNAPSHOT_VERSION
? process.env.TEST_ES_SNAPSHOT_VERSION
Expand Down Expand Up @@ -66,6 +66,25 @@ function headersToString(headers, indent = '') {
);
}

async function retry(log, fn) {
async function doAttempt(attempt) {
try {
return await fn();
} catch (error) {
if (isCliError(error) || attempt >= 5) {
throw error;
}

log.warning('...failure, retrying in 5 seconds:', error.message);
await new Promise(resolve => setTimeout(resolve, 5000));
log.info('...retrying');
return await doAttempt(attempt + 1);
}
}

return await doAttempt(1);
}

exports.Artifact = class Artifact {
/**
* Fetch an Artifact from the Artifact API for a license level and version
Expand All @@ -78,22 +97,27 @@ exports.Artifact = class Artifact {
const urlBuild = encodeURIComponent(TEST_ES_SNAPSHOT_VERSION);
const url = `${V1_VERSIONS_API}/${urlVersion}/builds/${urlBuild}/projects/elasticsearch`;

log.info('downloading artifact info from %s', chalk.bold(url));
const abc = new AbortController();
const resp = await fetch(url, { signal: abc.signal });
const json = await resp.text();
const json = await retry(log, async () => {
log.info('downloading artifact info from %s', chalk.bold(url));

if (resp.status === 404) {
abc.abort();
throw createCliError(
`Snapshots for ${version}/${TEST_ES_SNAPSHOT_VERSION} are not available`
);
}
const abc = new AbortController();
const resp = await fetch(url, { signal: abc.signal });
const json = await resp.text();

if (!resp.ok) {
abc.abort();
throw new Error(`Unable to read artifact info from ${url}: ${resp.statusText}\n ${json}`);
}
if (resp.status === 404) {
abc.abort();
throw createCliError(
`Snapshots for ${version}/${TEST_ES_SNAPSHOT_VERSION} are not available`
);
}

if (!resp.ok) {
abc.abort();
throw new Error(`Unable to read artifact info from ${url}: ${resp.statusText}\n ${json}`);
}

return json;
});

// parse the api response into an array of Artifact objects
const {
Expand Down Expand Up @@ -184,21 +208,23 @@ exports.Artifact = class Artifact {
* @return {Promise<void>}
*/
async download(dest) {
const cacheMeta = cache.readMeta(dest);
const tmpPath = `${dest}.tmp`;
await retry(this._log, async () => {
const cacheMeta = cache.readMeta(dest);
const tmpPath = `${dest}.tmp`;

const artifactResp = await this._download(tmpPath, cacheMeta.etag, cacheMeta.ts);
if (artifactResp.cached) {
return;
}
const artifactResp = await this._download(tmpPath, cacheMeta.etag, cacheMeta.ts);
if (artifactResp.cached) {
return;
}

await this._verifyChecksum(artifactResp);
await this._verifyChecksum(artifactResp);

// cache the etag for future downloads
cache.writeMeta(dest, { etag: artifactResp.etag });
// cache the etag for future downloads
cache.writeMeta(dest, { etag: artifactResp.etag });

// rename temp download to the destination location
fs.renameSync(tmpPath, dest);
// rename temp download to the destination location
fs.renameSync(tmpPath, dest);
});
}

/**
Expand Down

0 comments on commit 6d53abe

Please sign in to comment.