diff --git a/src/crawler.ts b/src/crawler.ts index 81c765a5b..272acf0ff 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -61,6 +61,7 @@ import { CDPSession, Frame, HTTPRequest, Page } from "puppeteer-core"; import { Recorder } from "./util/recorder.js"; import { SitemapReader } from "./util/sitemapper.js"; import { ScopedSeed } from "./util/seeds.js"; +import { WARCWriter } from "./util/warcwriter.js"; const HTTPS_AGENT = new HTTPSAgent({ rejectUnauthorized: false, @@ -149,6 +150,11 @@ export class Crawler { pagesFile: string; archivesDir: string; + tempdir: string; + tempCdxDir: string; + + screenshotWriter: WARCWriter | null; + textWriter: WARCWriter | null; blockRules: BlockRules | null; adBlockRules: AdBlockRules | null; @@ -177,8 +183,6 @@ export class Crawler { maxHeapUsed = 0; maxHeapTotal = 0; - warcPrefix: string; - driver!: (opts: { page: Page; data: PageState; @@ -271,6 +275,11 @@ export class Crawler { // archives dir this.archivesDir = path.join(this.collDir, "archive"); + this.tempdir = path.join(os.tmpdir(), "tmp-dl"); + this.tempCdxDir = path.join(this.collDir, "tmp-cdx"); + + this.screenshotWriter = null; + this.textWriter = null; this.blockRules = null; this.adBlockRules = null; @@ -288,12 +297,6 @@ export class Crawler { this.customBehaviors = ""; this.browser = new Browser(); - - this.warcPrefix = process.env.WARC_PREFIX || this.params.warcPrefix || ""; - - if (this.warcPrefix) { - this.warcPrefix += "-" + this.crawlId + "-"; - } } protected parseArgs() { @@ -447,14 +450,10 @@ export class Crawler { subprocesses.push(this.launchRedis()); - //const initRes = child_process.spawnSync("wb-manager", ["init", this.params.collection], {cwd: this.params.cwd}); - - //if (initRes.status) { - // logger.info("wb-manager init failed, collection likely already exists"); - //} - await fsp.mkdir(this.logDir, { recursive: true }); await fsp.mkdir(this.archivesDir, { recursive: true }); + await fsp.mkdir(this.tempdir, { recursive: true }); + await fsp.mkdir(this.tempCdxDir, { recursive: true }); this.logFH = fs.createWriteStream(this.logFilename); logger.setExternalLogStream(this.logFH); @@ -514,6 +513,13 @@ export class Crawler { { detached: RUN_DETACHED }, ); } + + if (this.params.screenshot) { + this.screenshotWriter = this.createExtraResourceWarcWriter("screenshots"); + } + if (this.params.text) { + this.textWriter = this.createExtraResourceWarcWriter("text"); + } } extraChromeArgs() { @@ -812,16 +818,15 @@ self.__bx_behaviors.selectMainBehavior(); const logDetails = { page: url, workerid }; - if (this.params.screenshot) { + if (this.params.screenshot && this.screenshotWriter) { if (!data.isHTMLPage) { logger.debug("Skipping screenshots for non-HTML page", logDetails); } const screenshots = new Screenshots({ - warcPrefix: this.warcPrefix, browser: this.browser, page, url, - directory: this.archivesDir, + writer: this.screenshotWriter, }); if (this.params.screenshot.includes("view")) { await screenshots.take("view", saveOutput ? data : null); @@ -836,11 +841,10 @@ self.__bx_behaviors.selectMainBehavior(); let textextract = null; - if (data.isHTMLPage) { + if (data.isHTMLPage && this.textWriter) { textextract = new TextExtractViaSnapshot(cdp, { - warcPrefix: this.warcPrefix, + writer: this.textWriter, url, - directory: this.archivesDir, skipDocs: this.skipTextDocs, }); const { text } = await textextract.extractAndStoreText( @@ -1151,6 +1155,7 @@ self.__bx_behaviors.selectMainBehavior(); if (this.interrupted) { await this.browser.close(); await closeWorkers(0); + await this.closeFiles(); await this.setStatusAndExit(13, "interrupted"); } else { await this.setStatusAndExit(0, "done"); @@ -1298,6 +1303,8 @@ self.__bx_behaviors.selectMainBehavior(); await this.pagesFH.close(); } + await this.closeFiles(); + await this.writeStats(); // if crawl has been stopped, mark as final exit for post-crawl tasks @@ -1308,6 +1315,15 @@ self.__bx_behaviors.selectMainBehavior(); await this.postCrawl(); } + async closeFiles() { + if (this.textWriter) { + await this.textWriter.flush(); + } + if (this.screenshotWriter) { + await this.screenshotWriter.flush(); + } + } + protected async _addInitialSeeds() { for (let i = 0; i < this.params.scopedSeeds.length; i++) { const seed = this.params.scopedSeeds[i]; @@ -2368,15 +2384,56 @@ self.__bx_behaviors.selectMainBehavior(); } } + getWarcPrefix(defaultValue = "") { + let warcPrefix = + process.env.WARC_PREFIX || this.params.warcPrefix || defaultValue; + + if (warcPrefix) { + warcPrefix += "-" + this.crawlId + "-"; + } + + return warcPrefix; + } + + createExtraResourceWarcWriter(resourceName: string, gzip = true) { + const filenameBase = `${this.getWarcPrefix()}${resourceName}`; + + return this.createWarcWriter(filenameBase, gzip, { resourceName }); + } + + createWarcWriter( + filenameBase: string, + gzip: boolean, + logDetails: Record, + ) { + const filenameTemplate = `${filenameBase}.warc${gzip ? ".gz" : ""}`; + + return new WARCWriter({ + archivesDir: this.archivesDir, + tempCdxDir: this.tempCdxDir, + filenameTemplate, + rolloverSize: this.params.rolloverSize, + gzip, + logDetails, + }); + } + createRecorder(id: number): Recorder | null { if (!this.recording) { return null; } + const filenameBase = `${this.getWarcPrefix("rec")}$ts-${id}`; + + const writer = this.createWarcWriter(filenameBase, true, { + id: id.toString(), + }); + const res = new Recorder({ workerid: id, - collDir: this.collDir, crawler: this, + writer, + tempdir: this.tempdir, }); this.browser.recorders.push(res); diff --git a/src/replaycrawler.ts b/src/replaycrawler.ts index 8ab8b8bfe..43b02d786 100644 --- a/src/replaycrawler.ts +++ b/src/replaycrawler.ts @@ -16,7 +16,6 @@ import { ZipRangeReader } from "@webrecorder/wabac/src/wacz/ziprangereader.js"; import { createLoader } from "@webrecorder/wabac/src/blockloaders.js"; import { AsyncIterReader } from "warcio"; -import { WARCResourceWriter } from "./util/warcresourcewriter.js"; import { parseArgs } from "./util/argParser.js"; import { PNG } from "pngjs"; @@ -25,6 +24,7 @@ import pixelmatch from "pixelmatch"; import levenshtein from "js-levenshtein"; import { MAX_URL_LENGTH } from "./util/reqresp.js"; import { openAsBlob } from "fs"; +import { WARCWriter } from "./util/warcwriter.js"; // RWP Replay Prefix const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/"; @@ -67,6 +67,7 @@ export class ReplayCrawler extends Crawler { qaSource: string; pageInfos: Map; + infoWriter: WARCWriter | null; reloadTimeouts: WeakMap; @@ -98,6 +99,14 @@ export class ReplayCrawler extends Crawler { this.params.serviceWorker = "enabled"; this.reloadTimeouts = new WeakMap(); + + this.infoWriter = null; + } + + async bootstrap(): Promise { + await super.bootstrap(); + + this.infoWriter = this.createExtraResourceWarcWriter("info"); } protected parseArgs() { @@ -666,18 +675,13 @@ export class ReplayCrawler extends Crawler { (state as ComparisonPageState).comparison = comparison; } - const writer = new WARCResourceWriter({ + await this.infoWriter?.writeNewResourceRecord({ + buffer: new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)), + resourceType: "pageinfo", + contentType: "application/json", url: pageInfo.url, - directory: this.archivesDir, - warcPrefix: this.warcPrefix, - date: new Date(), - warcName: "info.warc.gz", }); - await writer.writeBufferToWARC( - new TextEncoder().encode(JSON.stringify(pageInfo, null, 2)), - "pageinfo", - "application/json", - ); + this.pageInfos.delete(page); } } diff --git a/src/util/recorder.ts b/src/util/recorder.ts index 0a47bb24d..17dca8919 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -1,6 +1,4 @@ -import fs from "fs"; import path from "path"; -import os from "os"; import { v4 as uuidv4 } from "uuid"; @@ -24,7 +22,6 @@ import { WARCWriter } from "./warcwriter.js"; import { RedisCrawlState, WorkerId } from "./state.js"; import { CDPSession, Protocol } from "puppeteer-core"; import { Crawler } from "../crawler.js"; -import { WARCResourceWriter } from "./warcresourcewriter.js"; const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000; const MAX_BROWSER_TEXT_FETCH_SIZE = 25_000_000; @@ -70,7 +67,6 @@ export type PageInfoRecord = { // ================================================================= export class Recorder { workerid: WorkerId; - collDir: string; crawler: Crawler; @@ -94,9 +90,7 @@ export class Recorder { allowFull206 = false; - archivesDir: string; tempdir: string; - tempCdxDir: string; gzip = true; @@ -107,46 +101,26 @@ export class Recorder { constructor({ workerid, - collDir, + writer, crawler, + tempdir, }: { workerid: WorkerId; - collDir: string; + writer: WARCWriter; crawler: Crawler; + tempdir: string; }) { this.workerid = workerid; this.crawler = crawler; this.crawlState = crawler.crawlState; + this.writer = writer; + + this.tempdir = tempdir; + this.warcQ = new PQueue({ concurrency: 1 }); this.fetcherQ = new PQueue({ concurrency: 1 }); - - this.collDir = collDir; - - this.archivesDir = path.join(this.collDir, "archive"); - this.tempdir = path.join(os.tmpdir(), "tmp-dl"); - this.tempCdxDir = path.join(this.collDir, "tmp-cdx"); - - fs.mkdirSync(this.tempdir, { recursive: true }); - fs.mkdirSync(this.archivesDir, { recursive: true }); - // fs.mkdirSync(this.tempCdxDir, { recursive: true }); - - const prefix = - process.env.WARC_PREFIX || crawler.params.warcPrefix || "rec"; - const crawlId = process.env.CRAWL_ID || os.hostname(); - const filenameTemplate = `${prefix}-${crawlId}-$ts-${this.workerid}.warc${ - this.gzip ? ".gz" : "" - }`; - - this.writer = new WARCWriter({ - archivesDir: this.archivesDir, - // tempCdxDir: this.tempCdxDir, - filenameTemplate, - rolloverSize: crawler.params.rolloverSize, - gzip: this.gzip, - logDetails: this.logDetails, - }); } async onCreatePage({ cdp }: { cdp: CDPSession }) { @@ -733,18 +707,19 @@ export class Recorder { } } - async writePageInfoRecord() { + writePageInfoRecord() { const text = JSON.stringify(this.pageInfo, null, 2); - const resourceRecord = await WARCResourceWriter.createResourceRecord( - new TextEncoder().encode(text), - "pageinfo", - "application/json", - this.pageUrl, - new Date(), - ); + const url = this.pageUrl; - this.warcQ.add(() => this.writer.writeSingleRecord(resourceRecord)); + this.warcQ.add(() => + this.writer.writeNewResourceRecord({ + buffer: new TextEncoder().encode(text), + resourceType: "pageinfo", + contentType: "application/json", + url, + }), + ); return this.pageInfo.ts; } diff --git a/src/util/screenshots.ts b/src/util/screenshots.ts index c64d876de..5d3e2e178 100644 --- a/src/util/screenshots.ts +++ b/src/util/screenshots.ts @@ -1,10 +1,10 @@ import sharp from "sharp"; -import { WARCResourceWriter } from "./warcresourcewriter.js"; import { logger, formatErr } from "./logger.js"; import { Browser } from "./browser.js"; import { Page } from "puppeteer-core"; import { PageState } from "./state.js"; +import { WARCWriter } from "./warcwriter.js"; // ============================================================================ @@ -42,18 +42,20 @@ export type ScreenshotOpts = { browser: Browser; page: Page; url: string; - directory: string; - warcPrefix: string; + writer: WARCWriter; }; -export class Screenshots extends WARCResourceWriter { +export class Screenshots { browser: Browser; page: Page; + url: string; + writer: WARCWriter; - constructor(opts: ScreenshotOpts) { - super({ ...opts, warcName: "screenshots.warc.gz" }); - this.browser = opts.browser; - this.page = opts.page; + constructor({ browser, page, writer, url }: ScreenshotOpts) { + this.browser = browser; + this.page = page; + this.url = url; + this.writer = writer; } async take( @@ -72,13 +74,14 @@ export class Screenshots extends WARCResourceWriter { if (state && screenshotType === "view") { state.screenshotView = screenshotBuffer; } - await this.writeBufferToWARC( - screenshotBuffer, - screenshotType, - "image/" + options.type, - ); + await this.writer.writeNewResourceRecord({ + buffer: screenshotBuffer, + resourceType: screenshotType, + contentType: "image/" + options.type, + url: this.url, + }); logger.info( - `Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.warcName}`, + `Screenshot (type: ${screenshotType}) for ${this.url} written to ${this.writer.filename}`, ); } catch (e) { logger.error( @@ -103,13 +106,14 @@ export class Screenshots extends WARCResourceWriter { // 16:9 thumbnail .resize(640, 360) .toBuffer(); - await this.writeBufferToWARC( - thumbnailBuffer, - screenshotType, - "image/" + options.type, - ); + await this.writer.writeNewResourceRecord({ + buffer: thumbnailBuffer, + resourceType: screenshotType, + contentType: "image/" + options.type, + url: this.url, + }); logger.info( - `Screenshot (type: thumbnail) for ${this.url} written to ${this.warcName}`, + `Screenshot (type: thumbnail) for ${this.url} written to ${this.writer.filename}`, ); } catch (e) { logger.error( diff --git a/src/util/textextract.ts b/src/util/textextract.ts index 876b35a24..ee9a545e8 100644 --- a/src/util/textextract.ts +++ b/src/util/textextract.ts @@ -1,26 +1,28 @@ -import { WARCResourceWriter } from "./warcresourcewriter.js"; import { logger } from "./logger.js"; import { CDPSession, Protocol } from "puppeteer-core"; +import { WARCWriter } from "./warcwriter.js"; // ============================================================================ type TextExtractOpts = { url: string; - directory: string; - warcPrefix: string; + writer: WARCWriter; skipDocs: number; }; // ============================================================================ -export abstract class BaseTextExtract extends WARCResourceWriter { +export abstract class BaseTextExtract { cdp: CDPSession; lastText: string | null = null; text: string | null = null; skipDocs: number = 0; + writer: WARCWriter; + url: string; - constructor(cdp: CDPSession, opts: TextExtractOpts) { - super({ ...opts, warcName: "text.warc.gz" }); + constructor(cdp: CDPSession, { writer, skipDocs, url }: TextExtractOpts) { + this.writer = writer; this.cdp = cdp; - this.skipDocs = opts.skipDocs || 0; + this.url = url; + this.skipDocs = skipDocs || 0; } async extractAndStoreText( @@ -41,13 +43,14 @@ export abstract class BaseTextExtract extends WARCResourceWriter { return { changed: false, text }; } if (saveToWarc) { - await this.writeBufferToWARC( - new TextEncoder().encode(text), + await this.writer.writeNewResourceRecord({ + buffer: new TextEncoder().encode(text), resourceType, - "text/plain", - ); + contentType: "text/plain", + url: this.url, + }); logger.debug( - `Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.warcName}`, + `Text Extracted (type: ${resourceType}) for ${this.url} written to ${this.writer.filename}`, ); } diff --git a/src/util/warcresourcewriter.ts b/src/util/warcresourcewriter.ts deleted file mode 100644 index 769cb3d36..000000000 --- a/src/util/warcresourcewriter.ts +++ /dev/null @@ -1,78 +0,0 @@ -import fs from "fs"; -import path from "path"; -import * as warcio from "warcio"; - -// =========================================================================== -export type WARCResourceWriterOpts = { - url: string; - directory: string; - date?: Date; - warcName: string; - warcPrefix: string; -}; - -// =========================================================================== -export class WARCResourceWriter { - url: string; - directory: string; - warcName: string; - date: Date; - - constructor({ - url, - directory, - date, - warcPrefix, - warcName, - }: WARCResourceWriterOpts) { - this.url = url; - this.directory = directory; - this.warcName = path.join(this.directory, warcPrefix + warcName); - this.date = date ? date : new Date(); - } - - async writeBufferToWARC( - contents: Uint8Array, - resourceType: string, - contentType: string, - ) { - const warcRecord = await WARCResourceWriter.createResourceRecord( - contents, - resourceType, - contentType, - this.url, - this.date, - ); - const warcRecordBuffer = await warcio.WARCSerializer.serialize(warcRecord, { - gzip: true, - }); - fs.appendFileSync(this.warcName, warcRecordBuffer); - } - - static async createResourceRecord( - buffer: Uint8Array, - resourceType: string, - contentType: string, - url: string, - date: Date, - ) { - const warcVersion = "WARC/1.1"; - const warcRecordType = "resource"; - const warcHeaders = { "Content-Type": contentType }; - async function* content() { - yield buffer; - } - const resourceUrl = `urn:${resourceType}:${url}`; - - return warcio.WARCRecord.create( - { - url: resourceUrl, - date: date.toISOString(), - type: warcRecordType, - warcVersion, - warcHeaders, - }, - content(), - ); - } -} diff --git a/src/util/warcwriter.ts b/src/util/warcwriter.ts index b30f886dd..5214028e3 100644 --- a/src/util/warcwriter.ts +++ b/src/util/warcwriter.ts @@ -2,14 +2,22 @@ import fs from "fs"; import { Writable } from "stream"; import path from "path"; -import { CDXIndexer } from "warcio"; +import { CDXIndexer, WARCRecord } from "warcio"; import { WARCSerializer } from "warcio/node"; import { logger, formatErr } from "./logger.js"; -import type { IndexerOffsetLength, WARCRecord } from "warcio"; +import type { IndexerOffsetLength } from "warcio"; import { timestampNow } from "./timing.js"; const DEFAULT_ROLLOVER_SIZE = 1_000_000_000; +export type ResourceRecordData = { + buffer: Uint8Array; + resourceType: string; + contentType: string; + url: string; + date?: Date; +}; + // ================================================================= export class WARCWriter implements IndexerOffsetLength { archivesDir: string; @@ -47,6 +55,8 @@ export class WARCWriter implements IndexerOffsetLength { }) { this.archivesDir = archivesDir; this.tempCdxDir = tempCdxDir; + // for now, disabling CDX + this.tempCdxDir = undefined; this.logDetails = logDetails; this.gzip = gzip; this.rolloverSize = rolloverSize; @@ -137,6 +147,39 @@ export class WARCWriter implements IndexerOffsetLength { this._writeCDX(record); } + async writeNewResourceRecord({ + buffer, + resourceType, + contentType, + url, + date, + }: ResourceRecordData) { + const warcVersion = "WARC/1.1"; + const warcRecordType = "resource"; + const warcHeaders = { "Content-Type": contentType }; + async function* content() { + yield buffer; + } + const resourceUrl = `urn:${resourceType}:${url}`; + + if (!date) { + date = new Date(); + } + + return await this.writeSingleRecord( + WARCRecord.create( + { + url: resourceUrl, + date: date.toISOString(), + type: warcRecordType, + warcVersion, + warcHeaders, + }, + content(), + ), + ); + } + private async _writeRecord(record: WARCRecord, serializer: WARCSerializer) { if (this.done) { logger.warn( @@ -188,8 +231,6 @@ export class WARCWriter implements IndexerOffsetLength { } async flush() { - this.done = true; - if (this.fh) { await streamFinish(this.fh); this.fh = null; @@ -201,6 +242,8 @@ export class WARCWriter implements IndexerOffsetLength { await streamFinish(this.cdxFH); this.cdxFH = null; } + + this.done = true; } } diff --git a/src/util/worker.ts b/src/util/worker.ts index 749eec12b..9b99c3590 100644 --- a/src/util/worker.ts +++ b/src/util/worker.ts @@ -290,7 +290,7 @@ export class PageWorker { } finally { try { if (this.recorder) { - opts.data.ts = await this.recorder.writePageInfoRecord(); + opts.data.ts = this.recorder.writePageInfoRecord(); } } catch (e) { logger.error(