Skip to content

Commit

Permalink
fix save load state:
Browse files Browse the repository at this point in the history
- ensure seen urls that were done still added to 'doneUrls' list, fixes #491
- ensure extraSeeds added from redirects also added to redis and serialized
  • Loading branch information
ikreymer committed Mar 14, 2024
1 parent fa37f62 commit 34fda06
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 17 deletions.
15 changes: 15 additions & 0 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ export class Crawler {
os.hostname(),
);

await this.initExtraSeeds();

// clear any pending URLs from this instance
await this.crawlState.clearOwnPendingLocks();

Expand All @@ -348,6 +350,15 @@ export class Crawler {
return this.crawlState;
}

async initExtraSeeds() {
const extraSeeds = await this.crawlState.getExtraSeeds();

for (const { seedId, url } of extraSeeds) {
const seed = this.params.scopedSeeds[seedId];
this.params.scopedSeeds.push(seed.newScopedSeed(url));
}
}

initScreenCaster() {
let transport;

Expand Down Expand Up @@ -1580,6 +1591,10 @@ self.__bx_behaviors.selectMainBehavior();
const seed = this.params.scopedSeeds[data.seedId];
this.params.scopedSeeds.push(seed.newScopedSeed(respUrl));
data.seedId = this.params.scopedSeeds.length - 1;
await this.crawlState.addExtraSeed({
seedId: data.seedId,
url: respUrl,
});
logger.info("Seed page redirected, adding redirected seed", {
origUrl: url,
newUrl: respUrl,
Expand Down
102 changes: 85 additions & 17 deletions src/util/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export type QueueEntry = {
extraHops: number;
};

// ============================================================================
export type ExtraRedirectSeed = {
url: string;
seedId: number;
};

// ============================================================================
export type PageCallbacks = {
addLink?: (url: string) => Promise<void>;
Expand Down Expand Up @@ -127,6 +133,16 @@ declare module "ioredis" {
}
}

// ============================================================================
type SaveState = {
done: number | string[];
doneUrls: string[];
queued: string[];
pending: string[];
failed: string[];
errors: string[];
};

// ============================================================================
export class RedisCrawlState {
redis: Redis;
Expand All @@ -143,6 +159,7 @@ export class RedisCrawlState {
fkey: string;
ekey: string;
pageskey: string;
esKey: string;

constructor(redis: Redis, key: string, maxPageTime: number, uid: string) {
this.redis = redis;
Expand All @@ -163,6 +180,8 @@ export class RedisCrawlState {
// pages
this.pageskey = this.key + ":pages";

this.esKey = this.key + ":extraSeeds";

this._initLuaCommands(this.redis);
}

Expand Down Expand Up @@ -492,22 +511,49 @@ return 0;
return !!(await this.redis.sismember(this.skey, url));
}

async serialize() {
async serialize(): Promise<SaveState> {
//const queued = await this._iterSortKey(this.qkey);
const done = await this.numDone();
const queued = await this._iterSortedKey(this.qkey);
const seen = await this._iterSet(this.skey);
const queued = await this._iterSortedKey(this.qkey, seen);
const pending = await this.getPendingList();
const failed = await this._iterListKeys(this.fkey);
const failed = await this._iterListKeys(this.fkey, seen);
const errors = await this.getErrorList();

return { done, queued, pending, failed, errors };
const doneUrls = [...seen.values()];

return { done, doneUrls, queued, pending, failed, errors };
}

_getScore(data: QueueEntry) {
return (data.depth || 0) + (data.extraHops || 0) * MAX_DEPTH;
}

async _iterSortedKey(key: string, inc = 100) {
async _iterSet(key: string, count = 100) {
const stream = this.redis.sscanStream(key, { count });

const results: Set<string> = new Set<string>();

stream.on("data", async (someResults: string[]) => {
stream.pause();

for (const result of someResults) {
results.add(result);
}

stream.resume();
});

await new Promise<void>((resolve) => {
stream.on("end", () => {
resolve();
});
});

return results;
}

async _iterSortedKey(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];

const len = await this.redis.zcard(key);
Expand All @@ -521,33 +567,35 @@ return 0;
i,
inc,
);
results.push(...someResults);

for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}

return results;
}

async _iterListKeys(key: string, inc = 100) {
async _iterListKeys(key: string, seenSet: Set<string>, inc = 100) {
const results: string[] = [];

const len = await this.redis.llen(key);

for (let i = 0; i < len; i += inc) {
const someResults = await this.redis.lrange(key, i, i + inc - 1);
results.push(...someResults);

for (const result of someResults) {
const json = JSON.parse(result);
seenSet.delete(json.url);
results.push(result);
}
}
return results;
}

async load(
// TODO: Fix this the next time the file is edited.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
state: Record<string, any>,
seeds: ScopedSeed[],
checkScope: boolean,
) {
const seen: string[] = [];

async load(state: SaveState, seeds: ScopedSeed[], checkScope: boolean) {
// need to delete existing keys, if exist to fully reset state
await this.redis.del(this.qkey);
await this.redis.del(this.pkey);
Expand All @@ -556,6 +604,12 @@ return 0;
await this.redis.del(this.skey);
await this.redis.del(this.ekey);

let seen: string[] = [];

if (state.doneUrls) {
seen = state.doneUrls;
}

for (const json of state.queued) {
const data = JSON.parse(json);
if (checkScope) {
Expand Down Expand Up @@ -698,4 +752,18 @@ return 0;
async writeToPagesQueue(value: string) {
return await this.redis.lpush(this.pageskey, value);
}

// add extra seeds from redirect
async addExtraSeed(seed: ExtraRedirectSeed) {
return await this.redis.lpush(this.esKey, JSON.stringify(seed));
}

async getExtraSeeds() {
const seeds: ExtraRedirectSeed[] = [];
const res = await this.redis.lrange(this.esKey, 0, -1);
for (const key of res) {
seeds.push(JSON.parse(key));
}
return seeds;
}
}

0 comments on commit 34fda06

Please sign in to comment.