Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Save/Load State #495

Merged
merged 6 commits into from Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 26 additions & 11 deletions src/crawler.ts
Expand Up @@ -329,6 +329,18 @@ export class Crawler {
os.hostname(),
);

// load full state from config
if (this.params.state) {
await this.crawlState.load(
this.params.state,
this.params.scopedSeeds,
true,
);
// otherwise, just load extra seeds
} else {
await this.loadExtraSeeds();
}

// clear any pending URLs from this instance
await this.crawlState.clearOwnPendingLocks();

Expand All @@ -348,6 +360,15 @@ export class Crawler {
return this.crawlState;
}

async loadExtraSeeds() {
const extraSeeds = await this.crawlState.getExtraSeeds();

for (const { origSeedId, newUrl } of extraSeeds) {
const seed = this.params.scopedSeeds[origSeedId];
this.params.scopedSeeds.push(seed.newScopedSeed(newUrl));
}
}

initScreenCaster() {
let transport;

Expand Down Expand Up @@ -1190,14 +1211,6 @@ self.__bx_behaviors.selectMainBehavior();

await this.crawlState.setStatus("running");

if (this.params.state) {
await this.crawlState.load(
this.params.state,
this.params.scopedSeeds,
true,
);
}

await this.initPages();

this.adBlockRules = new AdBlockRules(
Expand Down Expand Up @@ -1577,9 +1590,11 @@ self.__bx_behaviors.selectMainBehavior();
const isChromeError = page.url().startsWith("chrome-error://");

if (depth === 0 && !isChromeError && respUrl !== url) {
const seed = this.params.scopedSeeds[data.seedId];
this.params.scopedSeeds.push(seed.newScopedSeed(respUrl));
data.seedId = this.params.scopedSeeds.length - 1;
data.seedId = await this.crawlState.addExtraSeed(
this.params.scopedSeeds,
data.seedId,
respUrl,
);
logger.info("Seed page redirected, adding redirected seed", {
origUrl: url,
newUrl: respUrl,
Expand Down
155 changes: 125 additions & 30 deletions src/util/state.ts
Expand Up @@ -35,6 +35,12 @@ export type QueueEntry = {
extraHops: number;
};

// ============================================================================
export type ExtraRedirectSeed = {
newUrl: string;
origSeedId: number;
};

// ============================================================================
export type PageCallbacks = {
addLink?: (url: string) => Promise<void>;
Expand Down Expand Up @@ -127,6 +133,17 @@ declare module "ioredis" {
}
}

// ============================================================================
type SaveState = {
done?: number | string[];
finished: string[];
queued: string[];
pending: string[];
failed: string[];
errors: string[];
extraSeeds: string[];
};

// ============================================================================
export class RedisCrawlState {
redis: Redis;
Expand All @@ -143,6 +160,7 @@ export class RedisCrawlState {
fkey: string;
ekey: string;
pageskey: string;
esKey: string;

constructor(redis: Redis, key: string, maxPageTime: number, uid: string) {
this.redis = redis;
Expand All @@ -163,6 +181,8 @@ export class RedisCrawlState {
// pages
this.pageskey = this.key + ":pages";

this.esKey = this.key + ":extraSeeds";

this._initLuaCommands(this.redis);
}

Expand Down Expand Up @@ -492,22 +512,50 @@ return 0;
return !!(await this.redis.sismember(this.skey, url));
}

async serialize() {
async serialize(): Promise<SaveState> {
//const queued = await this._iterSortKey(this.qkey);
const done = await this.numDone();
const queued = await this._iterSortedKey(this.qkey);
// const done = await this.numDone();
const seen = await this._iterSet(this.skey);
const queued = await this._iterSortedKey(this.qkey, seen);
const pending = await this.getPendingList();
const failed = await this._iterListKeys(this.fkey);
const failed = await this._iterListKeys(this.fkey, seen);
const errors = await this.getErrorList();
const extraSeeds = await this._iterListKeys(this.esKey, seen);

const finished = [...seen.values()];

return { done, queued, pending, failed, errors };
return { extraSeeds, finished, queued, pending, failed, errors };
}

_getScore(data: QueueEntry) {
return (data.depth || 0) + (data.extraHops || 0) * MAX_DEPTH;
}

async _iterSortedKey(key: string, inc = 100) {
async _iterSet(key: string, count = 100) {
const stream = this.redis.sscanStream(key, { count });

const results: Set<string> = new Set<string>();

stream.on("data", async (someResults: string[]) => {
stream.pause();

for (const result of someResults) {
results.add(result);
}

stream.resume();
});

await new Promise<void>((resolve) => {
stream.on("end", () => {
resolve();
});
});

return results;
}

async _iterSortedKey(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];

const len = await this.redis.zcard(key);
Expand All @@ -521,33 +569,35 @@ return 0;
i,
inc,
);
results.push(...someResults);

for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}

return results;
}

async _iterListKeys(key: string, inc = 100) {
async _iterListKeys(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];

const len = await this.redis.llen(key);

for (let i = 0; i < len; i += inc) {
const someResults = await this.redis.lrange(key, i, i + inc - 1);
results.push(...someResults);

for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}
return results;
}

async load(
// TODO: Fix this the next time the file is edited.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
state: Record<string, any>,
seeds: ScopedSeed[],
checkScope: boolean,
) {
const seen: string[] = [];

async load(state: SaveState, seeds: ScopedSeed[], checkScope: boolean) {
// need to delete existing keys, if exist to fully reset state
await this.redis.del(this.qkey);
await this.redis.del(this.pkey);
Expand All @@ -556,6 +606,21 @@ return 0;
await this.redis.del(this.skey);
await this.redis.del(this.ekey);

let seen: string[] = [];

if (state.finished) {
seen = state.finished;

await this.redis.set(this.dkey, state.finished.length);
}

if (state.extraSeeds) {
for (const extraSeed of state.extraSeeds) {
const { newUrl, origSeedId }: ExtraRedirectSeed = JSON.parse(extraSeed);
await this.addExtraSeed(seeds, origSeedId, newUrl);
}
}

for (const json of state.queued) {
const data = JSON.parse(json);
if (checkScope) {
Expand All @@ -580,19 +645,23 @@ return 0;
seen.push(data.url);
}

if (typeof state.done === "number") {
// done key is just an int counter
await this.redis.set(this.dkey, state.done);
} else if (state.done instanceof Array) {
// for backwards compatibility with old save states
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.incr(this.dkey);
// backwards compatibility: not using done, instead 'finished'
// contains list of finished URLs
if (state.done) {
if (typeof state.done === "number") {
// done key is just an int counter
await this.redis.set(this.dkey, state.done);
} else if (state.done instanceof Array) {
// for backwards compatibility with old save states
for (const json of state.done) {
const data = JSON.parse(json);
if (data.failed) {
await this.redis.zadd(this.qkey, this._getScore(data), json);
} else {
await this.redis.incr(this.dkey);
}
seen.push(data.url);
}
seen.push(data.url);
}
}

Expand Down Expand Up @@ -698,4 +767,30 @@ return 0;
async writeToPagesQueue(value: string) {
return await this.redis.lpush(this.pageskey, value);
}

// add extra seeds from redirect
async addExtraSeed(seeds: ScopedSeed[], origSeedId: number, newUrl: string) {
if (!seeds[origSeedId]) {
logger.fatal(
"State load, original seed missing",
{ origSeedId },
"state",
);
}
seeds.push(seeds[origSeedId].newScopedSeed(newUrl));
const newSeedId = seeds.length - 1;
const redirectSeed: ExtraRedirectSeed = { origSeedId, newUrl };
await this.redis.sadd(this.skey, newUrl);
await this.redis.lpush(this.esKey, JSON.stringify(redirectSeed));
return newSeedId;
}

async getExtraSeeds() {
const seeds: ExtraRedirectSeed[] = [];
const res = await this.redis.lrange(this.esKey, 0, -1);
for (const key of res) {
seeds.push(JSON.parse(key));
}
return seeds;
}
}