Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvements to 'non-graceful' interrupt to ensure WARCs are still closed gracefully #504

Merged
merged 6 commits into from Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 29 additions & 1 deletion docs/docs/user-guide/common-options.md
Expand Up @@ -115,7 +115,7 @@ Webhook notification JSON includes:

## Saving Crawl State: Interrupting and Restarting the Crawl

A crawl can be gracefully interrupted with Ctrl-C (SIGINT) or a SIGTERM.
A crawl can be gracefully interrupted with Ctrl-C (SIGINT) or a SIGTERM (see below for more details).

When a crawl is interrupted, the current crawl state is written to the `crawls` subdirectory inside the collection directory. The crawl state includes the current YAML config, if any, plus the current state of the crawl.

Expand All @@ -128,3 +128,31 @@ By default, the crawl state is only written when a crawl is interrupted before c
### Periodic State Saving

When the `--saveState` is set to always, Browsertrix Crawler will also save the state automatically during the crawl, as set by the `--saveStateInterval` setting. The crawler will keep the last `--saveStateHistory` save states and delete older ones. This provides extra backup, in the event that the crawl fails unexpectedly or is not terminated via Ctrl-C, several previous crawl states are still available.

## Crawl Interruption Options

Browsertrix Crawler has different crawl interruption modes, and does everything it can to ensure the WARC data written is always valid when a crawl is interrupted. The following are three interruption scenarios:

### 1. Graceful Shutdown

Initiated when a single SIGINT (Ctrl+C) or SIGTERM (`docker kill -s SIGINT`, `docker kill -s SIGTERM`, `kill`) signal received
ikreymer marked this conversation as resolved.
Show resolved Hide resolved
The crawler will attempt to finish current pages, finish any pending async requests, write all WARCS, generate WACZ files
and other post-processing, save state from Redis and then exit.

### 2. Less-Graceful, Quick Shutdown

If a second SIGINT / SIGTERM is received, the crawler will close the browser immediately, interrupting any on-going network requests.
Any pending network requests will not be awaited. However, anything in the WARC queue will be written and WARC files will be flushed.
ikreymer marked this conversation as resolved.
Show resolved Hide resolved
WACZ files and other post-processing will not be generated, but the state will be saved in Redis.
ikreymer marked this conversation as resolved.
Show resolved Hide resolved
WARC files should still be valid.
ikreymer marked this conversation as resolved.
Show resolved Hide resolved

### 3. Violent / Immediate Shutdown

If a crawler is killed, eg. with SIGKILL signal (`docker kill`, `kill -9`), the crawler container / process will be immediately shut down. It will not have a chance to finish any WARC files, and there is no guarantee that WARC files will be valid, but the crawler will of course exit right away.


### Recommendations

It is recommended to graceful stop the crawler by sending a SIGINT or SIGTERM signal, which can be done via Ctrl+C or `docker kill -s SIGINT <containerid>`. Repeating the command will result in a faster, slightly less-graceful shutdown.
ikreymer marked this conversation as resolved.
Show resolved Hide resolved
Kubernetes also uses SIGTERM by default when shutting down container pods. Using SIGKILL is not recommended
ikreymer marked this conversation as resolved.
Show resolved Hide resolved
except for last resort, and only when data is to be discarded.
9 changes: 8 additions & 1 deletion src/crawler.ts
Expand Up @@ -32,7 +32,12 @@ import { Screenshots } from "./util/screenshots.js";
import { parseArgs } from "./util/argParser.js";
import { initRedis } from "./util/redis.js";
import { logger, formatErr } from "./util/logger.js";
import { WorkerOpts, WorkerState, runWorkers } from "./util/worker.js";
import {
WorkerOpts,
WorkerState,
closeWorkers,
runWorkers,
} from "./util/worker.js";
import { sleep, timedRun, secondsElapsed } from "./util/timing.js";
import { collectAllFileSources, getInfoString } from "./util/file_reader.js";

Expand Down Expand Up @@ -1117,6 +1122,8 @@ self.__bx_behaviors.selectMainBehavior();
await this.serializeConfig();

if (this.interrupted) {
await this.browser.close();
await closeWorkers(0);
await this.setStatusAndExit(13, "interrupted");
} else {
await this.setStatusAndExit(0, "done");
Expand Down
16 changes: 9 additions & 7 deletions src/util/recorder.ts
Expand Up @@ -803,13 +803,15 @@ export class Recorder {
await this.fetcherQ.onIdle();
};

await timedRun(
finishFetch(),
timeout,
"Finishing Fetch Timed Out",
this.logDetails,
"recorder",
);
if (timeout > 0) {
await timedRun(
finishFetch(),
timeout,
"Finishing Fetch Timed Out",
this.logDetails,
"recorder",
);
}

logger.debug("Finishing WARC writing", this.logDetails, "recorder");
await this.warcQ.onIdle();
Expand Down
27 changes: 16 additions & 11 deletions src/util/warcwriter.ts
Expand Up @@ -105,15 +105,6 @@ export class WARCWriter implements IndexerOffsetLength {
requestRecord: WARCRecord,
responseSerializer: WARCSerializer | undefined = undefined,
) {
if (this.done) {
logger.warn(
"Writer closed, not writing records",
this.logDetails,
"writer",
);
return;
}

const opts = { gzip: this.gzip };

if (!responseSerializer) {
Expand Down Expand Up @@ -147,6 +138,15 @@ export class WARCWriter implements IndexerOffsetLength {
}

private async _writeRecord(record: WARCRecord, serializer: WARCSerializer) {
if (this.done) {
logger.warn(
"Writer closed, not writing records",
this.logDetails,
"writer",
);
return 0;
}

let total = 0;
const url = record.warcTargetURI;

Expand All @@ -171,6 +171,11 @@ export class WARCWriter implements IndexerOffsetLength {
}

_writeCDX(record: WARCRecord | null) {
if (this.done) {
logger.warn("Writer closed, not writing CDX", this.logDetails, "writer");
return;
}

if (this.indexer && this.filename) {
const cdx = this.indexer.indexRecord(record, this, this.filename);

Expand All @@ -183,6 +188,8 @@ export class WARCWriter implements IndexerOffsetLength {
}

async flush() {
this.done = true;

if (this.fh) {
await streamFinish(this.fh);
this.fh = null;
Expand All @@ -194,8 +201,6 @@ export class WARCWriter implements IndexerOffsetLength {
await streamFinish(this.cdxFH);
this.cdxFH = null;
}

this.done = true;
}
}

Expand Down
85 changes: 46 additions & 39 deletions src/util/worker.ts
Expand Up @@ -14,43 +14,6 @@ const NEW_WINDOW_TIMEOUT = 20;
const TEARDOWN_TIMEOUT = 10;
const FINISHED_TIMEOUT = 60;

// ===========================================================================
export async function runWorkers(
crawler: Crawler,
numWorkers: number,
maxPageTime: number,
collDir: string,
) {
logger.info(`Creating ${numWorkers} workers`, {}, "worker");

const workers = [];
let offset = 0;

// automatically set worker start by ordinal in k8s
// if hostname is "crawl-id-name-N"
// while CRAWL_ID is "crawl-id-name", then set starting
// worker index offset to N * numWorkers

if (process.env.CRAWL_ID) {
const rx = new RegExp(rxEscape(process.env.CRAWL_ID) + "\\-([\\d]+)$");
const m = os.hostname().match(rx);
if (m) {
offset = Number(m[1]) * numWorkers;
logger.info("Starting workerid index at " + offset, "worker");
}
}

for (let i = 0; i < numWorkers; i++) {
workers.push(new PageWorker(i + offset, crawler, maxPageTime, collDir));
}

await Promise.allSettled(workers.map((worker) => worker.run()));

await crawler.browser.close();

await Promise.allSettled(workers.map((worker) => worker.finalize()));
}

// ===========================================================================
export type WorkerOpts = {
page: Page;
Expand Down Expand Up @@ -372,9 +335,9 @@ export class PageWorker {
}
}

async finalize() {
async finalize(waitTime?: number) {
if (this.recorder) {
await this.recorder.onDone(this.maxPageTime);
await this.recorder.onDone(waitTime ?? this.maxPageTime);
}
}

Expand Down Expand Up @@ -433,3 +396,47 @@ export class PageWorker {
}
}
}

// ===========================================================================
const workers: PageWorker[] = [];

// ===========================================================================
export async function runWorkers(
crawler: Crawler,
numWorkers: number,
maxPageTime: number,
collDir: string,
) {
logger.info(`Creating ${numWorkers} workers`, {}, "worker");

let offset = 0;

// automatically set worker start by ordinal in k8s
// if hostname is "crawl-id-name-N"
// while CRAWL_ID is "crawl-id-name", then set starting
// worker index offset to N * numWorkers

if (process.env.CRAWL_ID) {
const rx = new RegExp(rxEscape(process.env.CRAWL_ID) + "\\-([\\d]+)$");
const m = os.hostname().match(rx);
if (m) {
offset = Number(m[1]) * numWorkers;
logger.info("Starting workerid index at " + offset, "worker");
}
}

for (let i = 0; i < numWorkers; i++) {
workers.push(new PageWorker(i + offset, crawler, maxPageTime, collDir));
}

await Promise.allSettled(workers.map((worker) => worker.run()));

await crawler.browser.close();

await closeWorkers();
}

// ===========================================================================
export function closeWorkers(waitTime?: number) {
return Promise.allSettled(workers.map((worker) => worker.finalize(waitTime)));
}