diff --git a/src/crawler.ts b/src/crawler.ts index 91a7d80c7..51178cd7c 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -329,6 +329,18 @@ export class Crawler { os.hostname(), ); + // load full state from config + if (this.params.state) { + await this.crawlState.load( + this.params.state, + this.params.scopedSeeds, + true, + ); + // otherwise, just load extra seeds + } else { + await this.loadExtraSeeds(); + } + // clear any pending URLs from this instance await this.crawlState.clearOwnPendingLocks(); @@ -348,6 +360,15 @@ export class Crawler { return this.crawlState; } + async loadExtraSeeds() { + const extraSeeds = await this.crawlState.getExtraSeeds(); + + for (const { origSeedId, newUrl } of extraSeeds) { + const seed = this.params.scopedSeeds[origSeedId]; + this.params.scopedSeeds.push(seed.newScopedSeed(newUrl)); + } + } + initScreenCaster() { let transport; @@ -1190,14 +1211,6 @@ self.__bx_behaviors.selectMainBehavior(); await this.crawlState.setStatus("running"); - if (this.params.state) { - await this.crawlState.load( - this.params.state, - this.params.scopedSeeds, - true, - ); - } - await this.initPages(); this.adBlockRules = new AdBlockRules( @@ -1577,9 +1590,11 @@ self.__bx_behaviors.selectMainBehavior(); const isChromeError = page.url().startsWith("chrome-error://"); if (depth === 0 && !isChromeError && respUrl !== url) { - const seed = this.params.scopedSeeds[data.seedId]; - this.params.scopedSeeds.push(seed.newScopedSeed(respUrl)); - data.seedId = this.params.scopedSeeds.length - 1; + data.seedId = await this.crawlState.addExtraSeed( + this.params.scopedSeeds, + data.seedId, + respUrl, + ); logger.info("Seed page redirected, adding redirected seed", { origUrl: url, newUrl: respUrl, diff --git a/src/util/state.ts b/src/util/state.ts index c75e9487c..ba7bb58b6 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -35,6 +35,12 @@ export type QueueEntry = { extraHops: number; }; +// ============================================================================ +export type ExtraRedirectSeed = { + newUrl: string; + origSeedId: number; +}; + // ============================================================================ export type PageCallbacks = { addLink?: (url: string) => Promise; @@ -127,6 +133,17 @@ declare module "ioredis" { } } +// ============================================================================ +type SaveState = { + done?: number | string[]; + finished: string[]; + queued: string[]; + pending: string[]; + failed: string[]; + errors: string[]; + extraSeeds: string[]; +}; + // ============================================================================ export class RedisCrawlState { redis: Redis; @@ -143,6 +160,7 @@ export class RedisCrawlState { fkey: string; ekey: string; pageskey: string; + esKey: string; constructor(redis: Redis, key: string, maxPageTime: number, uid: string) { this.redis = redis; @@ -163,6 +181,8 @@ export class RedisCrawlState { // pages this.pageskey = this.key + ":pages"; + this.esKey = this.key + ":extraSeeds"; + this._initLuaCommands(this.redis); } @@ -492,22 +512,50 @@ return 0; return !!(await this.redis.sismember(this.skey, url)); } - async serialize() { + async serialize(): Promise { //const queued = await this._iterSortKey(this.qkey); - const done = await this.numDone(); - const queued = await this._iterSortedKey(this.qkey); + // const done = await this.numDone(); + const seen = await this._iterSet(this.skey); + const queued = await this._iterSortedKey(this.qkey, seen); const pending = await this.getPendingList(); - const failed = await this._iterListKeys(this.fkey); + const failed = await this._iterListKeys(this.fkey, seen); const errors = await this.getErrorList(); + const extraSeeds = await this._iterListKeys(this.esKey, seen); + + const finished = [...seen.values()]; - return { done, queued, pending, failed, errors }; + return { extraSeeds, finished, queued, pending, failed, errors }; } _getScore(data: QueueEntry) { return (data.depth || 0) + (data.extraHops || 0) * MAX_DEPTH; } - async _iterSortedKey(key: string, inc = 100) { + async _iterSet(key: string, count = 100) { + const stream = this.redis.sscanStream(key, { count }); + + const results: Set = new Set(); + + stream.on("data", async (someResults: string[]) => { + stream.pause(); + + for (const result of someResults) { + results.add(result); + } + + stream.resume(); + }); + + await new Promise((resolve) => { + stream.on("end", () => { + resolve(); + }); + }); + + return results; + } + + async _iterSortedKey(key: string, seenSet: Set, inc = 100) { const results: string[] = []; const len = await this.redis.zcard(key); @@ -521,33 +569,35 @@ return 0; i, inc, ); - results.push(...someResults); + + for (const result of someResults) { + const json = JSON.parse(result); + seenSet.delete(json.url); + results.push(result); + } } return results; } - async _iterListKeys(key: string, inc = 100) { + async _iterListKeys(key: string, seenSet: Set, inc = 100) { const results: string[] = []; const len = await this.redis.llen(key); for (let i = 0; i < len; i += inc) { const someResults = await this.redis.lrange(key, i, i + inc - 1); - results.push(...someResults); + + for (const result of someResults) { + const json = JSON.parse(result); + seenSet.delete(json.url); + results.push(result); + } } return results; } - async load( - // TODO: Fix this the next time the file is edited. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - state: Record, - seeds: ScopedSeed[], - checkScope: boolean, - ) { - const seen: string[] = []; - + async load(state: SaveState, seeds: ScopedSeed[], checkScope: boolean) { // need to delete existing keys, if exist to fully reset state await this.redis.del(this.qkey); await this.redis.del(this.pkey); @@ -556,6 +606,21 @@ return 0; await this.redis.del(this.skey); await this.redis.del(this.ekey); + let seen: string[] = []; + + if (state.finished) { + seen = state.finished; + + await this.redis.set(this.dkey, state.finished.length); + } + + if (state.extraSeeds) { + for (const extraSeed of state.extraSeeds) { + const { newUrl, origSeedId }: ExtraRedirectSeed = JSON.parse(extraSeed); + await this.addExtraSeed(seeds, origSeedId, newUrl); + } + } + for (const json of state.queued) { const data = JSON.parse(json); if (checkScope) { @@ -580,19 +645,23 @@ return 0; seen.push(data.url); } - if (typeof state.done === "number") { - // done key is just an int counter - await this.redis.set(this.dkey, state.done); - } else if (state.done instanceof Array) { - // for backwards compatibility with old save states - for (const json of state.done) { - const data = JSON.parse(json); - if (data.failed) { - await this.redis.zadd(this.qkey, this._getScore(data), json); - } else { - await this.redis.incr(this.dkey); + // backwards compatibility: not using done, instead 'finished' + // contains list of finished URLs + if (state.done) { + if (typeof state.done === "number") { + // done key is just an int counter + await this.redis.set(this.dkey, state.done); + } else if (state.done instanceof Array) { + // for backwards compatibility with old save states + for (const json of state.done) { + const data = JSON.parse(json); + if (data.failed) { + await this.redis.zadd(this.qkey, this._getScore(data), json); + } else { + await this.redis.incr(this.dkey); + } + seen.push(data.url); } - seen.push(data.url); } } @@ -698,4 +767,30 @@ return 0; async writeToPagesQueue(value: string) { return await this.redis.lpush(this.pageskey, value); } + + // add extra seeds from redirect + async addExtraSeed(seeds: ScopedSeed[], origSeedId: number, newUrl: string) { + if (!seeds[origSeedId]) { + logger.fatal( + "State load, original seed missing", + { origSeedId }, + "state", + ); + } + seeds.push(seeds[origSeedId].newScopedSeed(newUrl)); + const newSeedId = seeds.length - 1; + const redirectSeed: ExtraRedirectSeed = { origSeedId, newUrl }; + await this.redis.sadd(this.skey, newUrl); + await this.redis.lpush(this.esKey, JSON.stringify(redirectSeed)); + return newSeedId; + } + + async getExtraSeeds() { + const seeds: ExtraRedirectSeed[] = []; + const res = await this.redis.lrange(this.esKey, 0, -1); + for (const key of res) { + seeds.push(JSON.parse(key)); + } + return seeds; + } } diff --git a/tests/saved-state.test.js b/tests/saved-state.test.js index 7ba349f70..00b222749 100644 --- a/tests/saved-state.test.js +++ b/tests/saved-state.test.js @@ -1,37 +1,51 @@ -import { exec } from "child_process"; +import { execSync } from "child_process"; import fs from "fs"; import path from "path"; import yaml from "js-yaml"; import Redis from "ioredis"; -function waitForProcess() { - let callback = null; - const p = new Promise((resolve) => { - callback = (/*error, stdout, stderr*/) => { - //console.log(stdout); - resolve(0); - }; - }); +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} - return { p, callback }; +async function waitContainer(containerId) { + try { + execSync(`docker kill -s SIGINT ${containerId}`); + } catch (e) { + return; + } + + // containerId is initially the full id, but docker ps + // only prints the short id (first 12 characters) + containerId = containerId.slice(0, 12); + + while (true) { + try { + const res = execSync("docker ps -q", { encoding: "utf-8" }); + if (res.indexOf(containerId) < 0) { + return; + } + } catch (e) { + console.error(e); + } + await sleep(500); + } } -var savedStateFile; -var state; -var numDone; -var redis; -var finishProcess; +let savedStateFile; +let state; +let numDone; +let numQueued; +let finished; test("check crawl interrupted + saved state written", async () => { - let proc = null; - - const wait = waitForProcess(); + let containerId = null; try { - proc = exec( - "docker run -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://webrecorder.net/ --limit 20", - { shell: "/bin/bash" }, - wait.callback, + containerId = execSync( + "docker run -d -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://www.webrecorder.net/ --limit 10", + { encoding: "utf-8" }, + //wait.callback, ); } catch (error) { console.log(error); @@ -60,12 +74,10 @@ test("check crawl interrupted + saved state written", async () => { // ignore } - await new Promise((resolve) => setTimeout(resolve, 500)); + await sleep(500); } - proc.kill("SIGINT"); - - await wait.p; + await waitContainer(containerId); const savedStates = fs.readdirSync( "test-crawls/collections/int-state-test/crawls", @@ -87,31 +99,36 @@ test("check parsing saved state + page done + queue present", () => { expect(!!saved.state).toBe(true); state = saved.state; + numDone = state.finished.length; + numQueued = state.queued.length; - numDone = state.done; + expect(numDone > 0).toEqual(true); + expect(numQueued > 0).toEqual(true); + expect(numDone + numQueued).toEqual(10); - expect(state.done > 0).toEqual(true); - expect(state.queued.length > 0).toEqual(true); + // ensure extra seeds also set + expect(state.extraSeeds).toEqual([ + `{"origSeedId":0,"newUrl":"https://webrecorder.net/"}`, + ]); + + finished = state.finished; }); test("check crawl restarted with saved state", async () => { - let proc = null; - - const wait = waitForProcess(); + let containerId = null; try { - proc = exec( - `docker run -p 36379:6379 -e CRAWL_ID=test -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://webrecorder.net/ --config /crawls/collections/int-state-test/crawls/${savedStateFile} --debugAccessRedis --limit 5`, - { shell: "/bin/bash" }, - wait.callback, + containerId = execSync( + `docker run -d -p 36379:6379 -e CRAWL_ID=test -v $PWD/test-crawls:/crawls -v $PWD/tests/fixtures:/tests/fixtures webrecorder/browsertrix-crawler crawl --collection int-state-test --url https://webrecorder.net/ --config /crawls/collections/int-state-test/crawls/${savedStateFile} --debugAccessRedis --limit 5`, + { encoding: "utf-8" }, ); } catch (error) { console.log(error); } - await new Promise((resolve) => setTimeout(resolve, 2000)); + await sleep(2000); - redis = new Redis("redis://127.0.0.1:36379/0", { lazyConnect: true }); + const redis = new Redis("redis://127.0.0.1:36379/0", { lazyConnect: true }); try { await redis.connect({ @@ -121,20 +138,23 @@ test("check crawl restarted with saved state", async () => { }, }); - await new Promise((resolve) => setTimeout(resolve, 2000)); + await sleep(2000); expect(await redis.get("test:d")).toBe(numDone + ""); + + for (const url of finished) { + const res = await redis.sismember("test:s", url); + expect(res).toBe(1); + } } catch (e) { console.log(e); } finally { - proc.kill("SIGINT"); - } - - finishProcess = wait.p; -}); + await waitContainer(containerId); -test("interrupt crawl and exit", async () => { - const res = await Promise.allSettled([finishProcess, redis.quit()]); - - expect(res[0].value).toBe(0); + try { + await redis.disconnect(); + } catch (e) { + // ignore + } + } });