From 21ae38362e0a47b4c0cba89781bb334d0fd73091 Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Tue, 19 Mar 2024 17:14:21 -0400 Subject: [PATCH] Add endpoints to read pages from older crawl WACZs into database (#1562) Fixes #1597 New endpoints (replacing old migration) to re-add crawl pages to db from WACZs. After a few implementation attempts, we settled on using [remotezip](https://github.com/gtsystem/python-remotezip) to handle parsing of the zip files and streaming their contents line-by-line for pages. I've also modified the sync log streaming to use remotezip as well, which allows us to remove our own zip module and let remotezip handle the complexity of parsing zip files. Database inserts for pages from WACZs are batched 100 at a time to help speed up the endpoint, and the task is kicked off using asyncio.create_task so as not to block before giving a response. StorageOps now contains a method for streaming the bytes of any file in a remote WACZ, requiring only the presigned URL for the WACZ and the name of the file to stream. --- backend/btrixcloud/crawls.py | 15 +-- backend/btrixcloud/pages.py | 117 ++++++++++++++----- backend/btrixcloud/storages.py | 177 +++++++++++++---------------- backend/btrixcloud/utils.py | 11 -- backend/btrixcloud/zip.py | 200 --------------------------------- backend/requirements.txt | 1 + backend/test/test_run_crawl.py | 41 +++++++ chart/templates/configmap.yaml | 2 + chart/values.yaml | 2 + 9 files changed, 221 insertions(+), 345 deletions(-) delete mode 100644 backend/btrixcloud/zip.py diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index e45ae1ac27..025ab70d86 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -222,16 +222,6 @@ async def delete_crawls( return {"deleted": True, "storageQuotaReached": quota_reached} - async def get_wacz_files(self, crawl_id: str, org: Organization): - """Return list of WACZ files associated with crawl.""" - wacz_files = [] - crawl_raw = await self.get_crawl_raw(crawl_id, org) - crawl = Crawl.from_dict(crawl_raw) - for file_ in crawl.files: - if file_.filename.endswith(".wacz"): - wacz_files.append(file_) - return wacz_files - # pylint: disable=too-many-arguments async def add_new_crawl( self, @@ -889,11 +879,10 @@ async def stream_crawl_logs( if context: contexts = context.split(",") - # If crawl is finished, stream logs from WACZ files + # If crawl is finished, stream logs from WACZ files using presigned urls if crawl.finished: - wacz_files = await ops.get_wacz_files(crawl_id, org) resp = await ops.storage_ops.sync_stream_wacz_logs( - org, wacz_files, log_levels, contexts + crawl.resources, log_levels, contexts ) return StreamingResponse( resp, diff --git a/backend/btrixcloud/pages.py b/backend/btrixcloud/pages.py index 91c5c3bb7e..b3af72b3d4 100644 --- a/backend/btrixcloud/pages.py +++ b/backend/btrixcloud/pages.py @@ -1,5 +1,7 @@ """crawl pages""" +import asyncio +import traceback from datetime import datetime from typing import TYPE_CHECKING, Optional, Tuple, List, Dict, Any, Union from uuid import UUID, uuid4 @@ -42,51 +44,83 @@ class PageOps: def __init__(self, mdb, crawl_ops, org_ops, storage_ops): self.pages = mdb["pages"] + self.crawls = mdb["crawls"] self.crawl_ops = crawl_ops self.org_ops = org_ops self.storage_ops = storage_ops - async def add_crawl_pages_to_db_from_wacz(self, crawl_id: str): + async def add_crawl_pages_to_db_from_wacz(self, crawl_id: str, batch_size=100): """Add pages to database from WACZ files""" + pages_buffer: List[Page] = [] try: crawl = await self.crawl_ops.get_crawl(crawl_id, None) - org = await self.org_ops.get_org_by_id(crawl.oid) - wacz_files = await self.crawl_ops.get_wacz_files(crawl_id, org) - stream = await self.storage_ops.sync_stream_pages_from_wacz(org, wacz_files) + stream = await self.storage_ops.sync_stream_wacz_pages(crawl.resources) for page_dict in stream: if not page_dict.get("url"): continue - await self.add_page_to_db(page_dict, crawl_id, crawl.oid) + if len(pages_buffer) > batch_size: + await self._add_pages_to_db(pages_buffer) + + pages_buffer.append( + self._get_page_from_dict(page_dict, crawl_id, crawl.oid) + ) + + # Add any remaining pages in buffer to db + if pages_buffer: + await self._add_pages_to_db(pages_buffer) + + print(f"Added pages for crawl {crawl_id} to db", flush=True) # pylint: disable=broad-exception-caught, raise-missing-from except Exception as err: + traceback.print_exc() print(f"Error adding pages for crawl {crawl_id} to db: {err}", flush=True) - async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID): - """Add page to database""" + def _get_page_from_dict(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID): + """Return Page object from dict""" page_id = page_dict.get("id") if not page_id: print(f'Page {page_dict.get("url")} has no id - assigning UUID', flush=True) page_id = uuid4() + status = page_dict.get("status") + if not status and page_dict.get("loadState"): + status = 200 + + return Page( + id=page_id, + oid=oid, + crawl_id=crawl_id, + url=page_dict.get("url"), + title=page_dict.get("title"), + load_state=page_dict.get("loadState"), + status=status, + timestamp=( + from_k8s_date(page_dict.get("ts")) + if page_dict.get("ts") + else datetime.now() + ), + ) + + async def _add_pages_to_db(self, pages: List[Page]): + """Add batch of pages to db in one insert""" + result = await self.pages.insert_many( + [ + page.to_dict( + exclude_unset=True, exclude_none=True, exclude_defaults=True + ) + for page in pages + ] + ) + if not result.inserted_ids: + # pylint: disable=broad-exception-raised + raise Exception("No pages inserted") + + async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID): + """Add page to database""" + page = self._get_page_from_dict(page_dict, crawl_id, oid) + try: - status = page_dict.get("status") - if not status and page_dict.get("loadState"): - status = 200 - page = Page( - id=page_id, - oid=oid, - crawl_id=crawl_id, - url=page_dict.get("url"), - title=page_dict.get("title"), - load_state=page_dict.get("loadState"), - status=status, - timestamp=( - from_k8s_date(page_dict.get("ts")) - if page_dict.get("ts") - else datetime.now() - ), - ) await self.pages.insert_one( page.to_dict( exclude_unset=True, exclude_none=True, exclude_defaults=True @@ -97,7 +131,7 @@ async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UU # pylint: disable=broad-except except Exception as err: print( - f"Error adding page {page_id} from crawl {crawl_id} to db: {err}", + f"Error adding page {page.id} from crawl {crawl_id} to db: {err}", flush=True, ) @@ -360,6 +394,20 @@ async def list_pages( return pages, total + async def re_add_crawl_pages(self, crawl_id: str, oid: UUID): + """Delete existing pages for crawl and re-add from WACZs.""" + await self.delete_crawl_pages(crawl_id, oid) + print(f"Deleted pages for crawl {crawl_id}", flush=True) + await self.add_crawl_pages_to_db_from_wacz(crawl_id) + + async def re_add_all_crawl_pages(self, oid: UUID): + """Re-add pages for all crawls in org""" + crawl_ids = await self.crawls.distinct( + "_id", {"type": "crawl", "finished": {"$ne": None}} + ) + for crawl_id in crawl_ids: + await self.re_add_crawl_pages(crawl_id, oid) + # ============================================================================ # pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme @@ -371,6 +419,25 @@ def init_pages_api(app, mdb, crawl_ops, org_ops, storage_ops, user_dep): org_crawl_dep = org_ops.org_crawl_dep + @app.post("/orgs/{oid}/crawls/all/pages/reAdd", tags=["pages"]) + async def re_add_all_crawl_pages( + org: Organization = Depends(org_crawl_dep), user: User = Depends(user_dep) + ): + """Re-add pages for all crawls in org (superuser only)""" + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + asyncio.create_task(ops.re_add_all_crawl_pages(org.id)) + return {"started": True} + + @app.post("/orgs/{oid}/crawls/{crawl_id}/pages/reAdd", tags=["pages"]) + async def re_add_crawl_pages( + crawl_id: str, org: Organization = Depends(org_crawl_dep) + ): + """Re-add pages for crawl""" + asyncio.create_task(ops.re_add_crawl_pages(crawl_id, org.id)) + return {"started": True} + @app.get( "/orgs/{oid}/crawls/{crawl_id}/pages/{page_id}", tags=["pages"], diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 08225b316f..a28878fc4b 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -14,18 +14,20 @@ ) from urllib.parse import urlsplit from contextlib import asynccontextmanager +from itertools import chain import asyncio import heapq import zlib import json -import itertools import os from datetime import datetime +from zipfile import ZipInfo from fastapi import Depends, HTTPException from stream_zip import stream_zip, NO_COMPRESSION_64 +from remotezip import RemoteZip import aiobotocore.session import boto3 @@ -43,10 +45,6 @@ S3StorageIn, OrgStorageRefs, ) -from .zip import ( - sync_get_zip_file, - sync_get_filestream, -) from .utils import is_bool, slug_from_name @@ -80,6 +78,12 @@ def __init__(self, org_ops, crawl_manager) -> None: self.is_local_minio = is_bool(os.environ.get("IS_LOCAL_MINIO")) + self.frontend_alias = os.environ.get( + "FRONTEND_ALIAS", "http://browsertrix-cloud-frontend" + ) + self.default_namespace = os.environ.get("DEFAULT_NAMESPACE", "default") + self.frontend_url = f"{self.frontend_alias}.{self.default_namespace}" + with open(os.environ["STORAGES_JSON"], encoding="utf-8") as fh: storage_list = json.loads(fh.read()) @@ -512,75 +516,53 @@ async def _delete_file( return status_code == 204 - async def sync_stream_pages_from_wacz( - self, - org: Organization, - wacz_files: List[CrawlFile], + async def sync_stream_wacz_pages( + self, wacz_files: List[CrawlFileOut] ) -> Iterator[Dict[Any, Any]]: - """Return stream of page dicts from last of WACZs""" - async with self.get_sync_client(org) as (client, bucket, key): - loop = asyncio.get_event_loop() + """Return stream of pages specified WACZ""" + loop = asyncio.get_event_loop() - stream = await loop.run_in_executor( - None, - self._sync_get_pages, - wacz_files, - client, - bucket, - key, - ) + resp = await loop.run_in_executor(None, self._sync_get_pages, wacz_files) - return stream + return resp async def sync_stream_wacz_logs( self, - org: Organization, - wacz_files: List[CrawlFile], + wacz_files: List[CrawlFileOut], log_levels: List[str], contexts: List[str], ) -> Iterator[bytes]: """Return filtered stream of logs from specified WACZs sorted by timestamp""" - async with self.get_sync_client(org) as (client, bucket, key): - loop = asyncio.get_event_loop() - - resp = await loop.run_in_executor( - None, - self._sync_get_logs, - wacz_files, - log_levels, - contexts, - client, - bucket, - key, - ) + loop = asyncio.get_event_loop() + + resp = await loop.run_in_executor( + None, + self._sync_get_logs, + wacz_files, + log_levels, + contexts, + ) - return resp + return resp def _sync_get_logs( self, - wacz_files: List[CrawlFile], + wacz_files: List[CrawlFileOut], log_levels: List[str], contexts: List[str], - client, - bucket: str, - key: str, ) -> Iterator[bytes]: """Generate filtered stream of logs from specified WACZs sorted by timestamp""" # pylint: disable=too-many-function-args def stream_log_lines( - wacz_key, wacz_filename, cd_start, log_zipinfo + log_zipinfo: ZipInfo, wacz_url: str, wacz_filename: str ) -> Iterator[dict]: """Pass lines as json objects""" + filename = log_zipinfo.filename - print( - f"Fetching log {log_zipinfo.filename} from {wacz_filename}", flush=True - ) - - line_iter: Iterator[bytes] = sync_get_filestream( - client, bucket, wacz_key, log_zipinfo, cd_start - ) + print(f"Fetching log {filename} from {wacz_filename}", flush=True) + line_iter: Iterator[bytes] = self._sync_get_filestream(wacz_url, filename) for line in line_iter: yield _parse_json(line.decode("utf-8", errors="ignore")) @@ -597,14 +579,14 @@ def stream_json_lines( yield json_str.encode("utf-8") def organize_based_on_instance_number( - wacz_files: List[CrawlFile], - ) -> List[List[CrawlFile]]: + wacz_files: List[CrawlFileOut], + ) -> List[List[CrawlFileOut]]: """Place wacz_files into their own list based on instance number""" - wacz_files.sort(key=lambda file: file.filename) - waczs_groups: Dict[str, List[CrawlFile]] = {} + wacz_files.sort(key=lambda file: file.name) + waczs_groups: Dict[str, List[CrawlFileOut]] = {} for file in wacz_files: - instance_number = file.filename[ - file.filename.rfind("-") + 1 : file.filename.rfind(".") + instance_number = file.name[ + file.name.rfind("-") + 1 : file.name.rfind(".") ] if instance_number in waczs_groups: waczs_groups[instance_number].append(file) @@ -619,24 +601,24 @@ def organize_based_on_instance_number( wacz_log_streams: List[Iterator[dict]] = [] for wacz_file in instance_list: - wacz_key = key + wacz_file.filename - cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key) - - log_files = [ - f - for f in zip_file.filelist - if f.filename.startswith("logs/") and not f.is_dir() - ] - log_files.sort(key=lambda log_zipinfo: log_zipinfo.filename) - - for log_zipinfo in log_files: - wacz_log_streams.append( - stream_log_lines( - wacz_key, wacz_file.filename, cd_start, log_zipinfo + wacz_url = wacz_file.path + if wacz_url.startswith("/data"): + wacz_url = f"{self.frontend_url}{wacz_url}" + + with RemoteZip(wacz_url) as remote_zip: + log_files: List[ZipInfo] = [ + f + for f in remote_zip.infolist() + if f.filename.startswith("logs/") and not f.is_dir() + ] + log_files.sort(key=lambda log_zipinfo: log_zipinfo.filename) + + for log_zipinfo in log_files: + wacz_log_streams.append( + stream_log_lines(log_zipinfo, wacz_url, wacz_file.name) ) - ) - log_generators.append(itertools.chain(*wacz_log_streams)) + log_generators.append(chain(*wacz_log_streams)) heap_iter = heapq.merge(*log_generators, key=lambda entry: entry["timestamp"]) @@ -644,50 +626,53 @@ def organize_based_on_instance_number( def _sync_get_pages( self, - wacz_files: List[CrawlFile], - client, - bucket: str, - key: str, + wacz_files: List[CrawlFileOut], ) -> Iterator[Dict[Any, Any]]: """Generate stream of page dicts from specified WACZs""" # pylint: disable=too-many-function-args def stream_page_lines( - wacz_key, wacz_filename, cd_start, pagefile_zipinfo + pagefile_zipinfo: ZipInfo, wacz_url: str, wacz_filename: str ) -> Iterator[Dict[Any, Any]]: """Pass lines as json objects""" + filename = pagefile_zipinfo.filename + print( - f"Fetching JSON lines from {pagefile_zipinfo.filename} in {wacz_filename}", + f"Fetching JSON lines from {filename} in {wacz_filename}", flush=True, ) - line_iter: Iterator[bytes] = sync_get_filestream( - client, bucket, wacz_key, pagefile_zipinfo, cd_start - ) + line_iter: Iterator[bytes] = self._sync_get_filestream(wacz_url, filename) for line in line_iter: yield _parse_json(line.decode("utf-8", errors="ignore")) page_generators: List[Iterator[Dict[Any, Any]]] = [] for wacz_file in wacz_files: - wacz_key = key + wacz_file.filename - cd_start, zip_file = sync_get_zip_file(client, bucket, wacz_key) - - page_files = [ - f - for f in zip_file.filelist - if f.filename.startswith("pages/") - and f.filename.endswith(".jsonl") - and not f.is_dir() - ] - for pagefile_zipinfo in page_files: - page_generators.append( - stream_page_lines( - wacz_key, wacz_file.filename, cd_start, pagefile_zipinfo + wacz_url = wacz_file.path + if wacz_url.startswith("/data"): + wacz_url = f"{self.frontend_url}{wacz_url}" + + with RemoteZip(wacz_url) as remote_zip: + page_files: List[ZipInfo] = [ + f + for f in remote_zip.infolist() + if f.filename.startswith("pages/") + and f.filename.endswith(".jsonl") + and not f.is_dir() + ] + for pagefile_zipinfo in page_files: + page_generators.append( + stream_page_lines(pagefile_zipinfo, wacz_url, wacz_file.name) ) - ) - return itertools.chain(*page_generators) + return chain.from_iterable(page_generators) + + def _sync_get_filestream(self, wacz_url: str, filename: str) -> Iterator[bytes]: + """Return iterator of lines in remote file as bytes""" + with RemoteZip(wacz_url) as remote_zip: + with remote_zip.open(filename) as file_stream: + yield from file_stream def _sync_dl( self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str diff --git a/backend/btrixcloud/utils.py b/backend/btrixcloud/utils.py index 1b60fe010a..66774ba8c0 100644 --- a/backend/btrixcloud/utils.py +++ b/backend/btrixcloud/utils.py @@ -120,14 +120,3 @@ def stream_dict_list_as_csv(data: List[Dict[str, Union[str, int]]], filename: st media_type="text/csv", headers={"Content-Disposition": f"attachment;filename={filename}"}, ) - - -async def gather_tasks_with_concurrency(*tasks, n=5): - """Limit concurrency to n tasks at a time""" - semaphore = asyncio.Semaphore(n) - - async def semaphore_task(task): - async with semaphore: - return await task - - return await asyncio.gather(*(semaphore_task(task) for task in tasks)) diff --git a/backend/btrixcloud/zip.py b/backend/btrixcloud/zip.py deleted file mode 100644 index bead89e3b0..0000000000 --- a/backend/btrixcloud/zip.py +++ /dev/null @@ -1,200 +0,0 @@ -""" -Methods for interacting with zip/WACZ files -""" - -import io -import struct -import zipfile -import zlib - - -# ============================================================================ -EOCD_RECORD_SIZE = 22 -ZIP64_EOCD_RECORD_SIZE = 56 -ZIP64_EOCD_LOCATOR_SIZE = 20 - -MAX_STANDARD_ZIP_SIZE = 4_294_967_295 - -CHUNK_SIZE = 1024 * 256 - - -# ============================================================================ -def sync_get_filestream(client, bucket, key, file_zipinfo, cd_start): - """Return uncompressed byte stream of file in WACZ""" - # pylint: disable=too-many-locals - file_head = sync_fetch( - client, bucket, key, cd_start + file_zipinfo.header_offset + 26, 4 - ) - name_len = parse_little_endian_to_int(file_head[0:2]) - extra_len = parse_little_endian_to_int(file_head[2:4]) - - content = sync_fetch_stream( - client, - bucket, - key, - cd_start + file_zipinfo.header_offset + 30 + name_len + extra_len, - file_zipinfo.compress_size, - ) - - decompress = False - if file_zipinfo.compress_type == zipfile.ZIP_DEFLATED: - decompress = True - - return sync_iter_lines(content, decompress=decompress) - - -def sync_iter_lines(chunk_iter, decompress=False, keepends=True): - """ - Iter by lines, adapted from botocore - """ - pending = b"" - for chunk in chunk_iter: - if decompress: - chunk = zlib.decompressobj(-zlib.MAX_WBITS).decompress(chunk) - lines = (pending + chunk).splitlines(True) - for line in lines[:-1]: - yield line.splitlines(keepends)[0] - pending = lines[-1] - if pending: - yield pending.splitlines(keepends)[0] - - -async def get_zip_file(client, bucket, key): - """Fetch enough of the WACZ file be able to read the zip filelist""" - file_size = await get_file_size(client, bucket, key) - eocd_record = await fetch( - client, bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE - ) - - if file_size <= MAX_STANDARD_ZIP_SIZE: - cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record) - central_directory = await fetch(client, bucket, key, cd_start, cd_size) - return ( - cd_start, - zipfile.ZipFile(io.BytesIO(central_directory + eocd_record)), - ) - - zip64_eocd_record = await fetch( - client, - bucket, - key, - file_size - - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE), - ZIP64_EOCD_RECORD_SIZE, - ) - zip64_eocd_locator = await fetch( - client, - bucket, - key, - file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE), - ZIP64_EOCD_LOCATOR_SIZE, - ) - cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record) - central_directory = await fetch(client, bucket, key, cd_start, cd_size) - return ( - cd_start, - zipfile.ZipFile( - io.BytesIO( - central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record - ) - ), - ) - - -def sync_get_zip_file(client, bucket, key): - """Fetch enough of the WACZ file be able to read the zip filelist""" - file_size = sync_get_file_size(client, bucket, key) - eocd_record = sync_fetch( - client, bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE - ) - - if file_size <= MAX_STANDARD_ZIP_SIZE: - cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record) - central_directory = sync_fetch(client, bucket, key, cd_start, cd_size) - with zipfile.ZipFile(io.BytesIO(central_directory + eocd_record)) as zip_file: - return (cd_start, zip_file) - - zip64_eocd_record = sync_fetch( - client, - bucket, - key, - file_size - - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE), - ZIP64_EOCD_RECORD_SIZE, - ) - zip64_eocd_locator = sync_fetch( - client, - bucket, - key, - file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE), - ZIP64_EOCD_LOCATOR_SIZE, - ) - cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record) - central_directory = sync_fetch(client, bucket, key, cd_start, cd_size) - with zipfile.ZipFile( - io.BytesIO( - central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record - ) - ) as zip_file: - return (cd_start, zip_file) - - -async def get_file_size(client, bucket, key): - """Get WACZ file size from HEAD request""" - head_response = await client.head_object(Bucket=bucket, Key=key) - return head_response["ContentLength"] - - -def sync_get_file_size(client, bucket, key): - """Get WACZ file size from HEAD request""" - head_response = client.head_object(Bucket=bucket, Key=key) - return head_response["ContentLength"] - - -async def fetch(client, bucket, key, start, length): - """Fetch a byte range from a file in object storage""" - end = start + length - 1 - response = await client.get_object( - Bucket=bucket, Key=key, Range=f"bytes={start}-{end}" - ) - return await response["Body"].read() - - -def sync_fetch(client, bucket, key, start, length): - """Fetch a byte range from a file in object storage""" - end = start + length - 1 - response = client.get_object(Bucket=bucket, Key=key, Range=f"bytes={start}-{end}") - return response["Body"].read() - - -def sync_fetch_stream(client, bucket, key, start, length): - """Fetch a byte range from a file in object storage as a stream""" - end = start + length - 1 - response = client.get_object(Bucket=bucket, Key=key, Range=f"bytes={start}-{end}") - return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE) - - -def get_central_directory_metadata_from_eocd(eocd): - """Get central directory start and size""" - cd_size = parse_little_endian_to_int(eocd[12:16]) - cd_start = parse_little_endian_to_int(eocd[16:20]) - return cd_start, cd_size - - -def get_central_directory_metadata_from_eocd64(eocd64): - """Get central directory start and size for zip64""" - cd_size = parse_little_endian_to_int(eocd64[40:48]) - cd_start = parse_little_endian_to_int(eocd64[48:56]) - return cd_start, cd_size - - -def parse_little_endian_to_int(little_endian_bytes): - """Convert little endian used in zip spec to int""" - byte_length = len(little_endian_bytes) - format_character = "q" - if byte_length == 4: - format_character = "i" - elif byte_length == 2: - format_character = "h" - - return struct.unpack("<" + format_character, little_endian_bytes)[0] diff --git a/backend/requirements.txt b/backend/requirements.txt index f8204b2116..078472546e 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -28,3 +28,4 @@ types_aiobotocore_s3 types-redis types-python-slugify types-pyYAML +remotezip diff --git a/backend/test/test_run_crawl.py b/backend/test/test_run_crawl.py index 08eb770dd2..26e58638a9 100644 --- a/backend/test/test_run_crawl.py +++ b/backend/test/test_run_crawl.py @@ -434,6 +434,7 @@ def test_crawl_pages(crawler_auth_headers, default_org_id, crawler_crawl_id): assert page["timestamp"] assert page.get("title") or page.get("title") is None assert page["load_state"] + assert page["status"] # Test GET page endpoint global page_id @@ -494,6 +495,46 @@ def test_crawl_pages(crawler_auth_headers, default_org_id, crawler_crawl_id): assert page["approved"] +def test_re_add_crawl_pages(crawler_auth_headers, default_org_id, crawler_crawl_id): + # Re-add pages and verify they were correctly added + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/pages/reAdd", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + assert r.json()["started"] + + time.sleep(10) + + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/pages", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + assert data["total"] >= 0 + + pages = data["items"] + assert pages + + for page in pages: + assert page["id"] + assert page["oid"] + assert page["crawl_id"] + assert page["url"] + assert page["timestamp"] + assert page.get("title") or page.get("title") is None + assert page["load_state"] + assert page["status"] + + # Ensure only superuser can re-add pages for all crawls in an org + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/all/pages/reAdd", + headers=crawler_auth_headers, + ) + assert r.status_code == 403 + + def test_crawl_page_notes(crawler_auth_headers, default_org_id, crawler_crawl_id): note_text = "testing" updated_note_text = "updated" diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index 570b1ccd8d..6efb48a86b 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -12,6 +12,8 @@ data: DEFAULT_NAMESPACE: {{ .Release.Namespace }} + FRONTEND_ALIAS: {{ .Values.frontend_alias | default "http://browsertrix-cloud-frontend" }} + CRAWLER_FQDN_SUFFIX: ".{{ .Values.crawler_namespace }}.svc.cluster.local" DEFAULT_ORG: "{{ .Values.default_org }}" diff --git a/chart/values.yaml b/chart/values.yaml index 69fb2d7265..46432470e4 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -130,6 +130,8 @@ frontend_memory: "64Mi" # if using ingress, this value is ignored local_service_port: 30870 +frontend_alias: "http://browsertrix-cloud-frontend" + # MongoDB Image # =========================================