From 4f676e4e82d5bce9ea909b0e8657af2e2d1e63ed Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 20 Mar 2024 22:42:16 -0700 Subject: [PATCH] QA Runs Initial Backend Implementation (#1586) Supports running QA Runs via the QA API! Builds on top of the `issue-1498-crawl-qa-backend-support` branch, fixes #1498 Also requires the latest Browsertrix Crawler 1.1.0+ (from webrecorder/browsertrix-crawler#469 branch) Notable changes: - QARun objects contain info about QA runs, which are crawls performed on data loaded from existing crawls. - Various crawl db operations can be performed on either the crawl or `qa.` object, and core crawl fields have been moved to CoreCrawlable. - While running,`QARun` data stored in a single `qa` object, while finished qa runs are added to `qaFinished` dictionary on the Crawl. The QA list API returns data from the finished list, sorted by most recent first. - Includes additional type fixes / type safety, especially around BaseCrawl / Crawl / UploadedCrawl functionality, also creating specific get_upload(), get_basecrawl(), get_crawl() getters for internal use and get_crawl_out() for API - Support filtering and sorting pages via `qaFilterBy` (screenshotMatch, textMatch) along with `gt`, `lt`, `gte`, `lte` params to return pages based on QA results. --------- Co-authored-by: Tessa Walsh --- backend/btrixcloud/background_jobs.py | 10 +- backend/btrixcloud/basecrawls.py | 219 ++++---- backend/btrixcloud/colls.py | 18 +- backend/btrixcloud/crawlconfigs.py | 30 +- backend/btrixcloud/crawlmanager.py | 31 ++ backend/btrixcloud/crawls.py | 503 +++++++++++++++--- backend/btrixcloud/db.py | 21 +- backend/btrixcloud/k8sapi.py | 14 +- backend/btrixcloud/main.py | 4 +- backend/btrixcloud/main_op.py | 2 +- .../migrations/migration_0004_config_seeds.py | 6 +- backend/btrixcloud/models.py | 162 ++++-- backend/btrixcloud/operator/crawls.py | 319 ++++++----- backend/btrixcloud/operator/cronjobs.py | 4 +- backend/btrixcloud/operator/models.py | 11 + backend/btrixcloud/orgs.py | 29 +- backend/btrixcloud/pages.py | 172 ++++-- backend/btrixcloud/storages.py | 27 +- backend/btrixcloud/uploads.py | 17 +- backend/btrixcloud/webhooks.py | 4 +- backend/test/test_qa.py | 187 +++++++ backend/test/test_run_crawl.py | 20 +- chart/app-templates/crawl_job.yaml | 11 +- chart/app-templates/crawler.yaml | 14 +- chart/app-templates/qa_configmap.yaml | 14 + chart/app-templates/replica_job.yaml | 2 +- chart/templates/configmap.yaml | 2 +- chart/templates/operators.yaml | 5 + chart/templates/priorities.yaml | 16 +- chart/test/test.yaml | 2 +- chart/values.yaml | 4 +- 31 files changed, 1396 insertions(+), 484 deletions(-) create mode 100644 backend/test/test_qa.py create mode 100644 chart/app-templates/qa_configmap.yaml diff --git a/backend/btrixcloud/background_jobs.py b/backend/btrixcloud/background_jobs.py index e77f96077d..b8cd420ea5 100644 --- a/backend/btrixcloud/background_jobs.py +++ b/backend/btrixcloud/background_jobs.py @@ -403,11 +403,11 @@ async def get_replica_job_file( profile = await self.profile_ops.get_profile(UUID(job.object_id), org) return BaseFile(**profile.resource.dict()) - item_res = await self.base_crawl_ops.get_crawl_raw(job.object_id, org) - matching_file = [ - f for f in item_res.get("files", []) if f["filename"] == job.file_path - ][0] - return BaseFile(**matching_file) + item_res = await self.base_crawl_ops.get_base_crawl(job.object_id, org) + matching_file = [f for f in item_res.files if f.filename == job.file_path][ + 0 + ] + return matching_file # pylint: disable=broad-exception-caught, raise-missing-from except Exception: raise HTTPException(status_code=404, detail="file_not_found") diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index 22b71073d1..a02ab3fac4 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -5,7 +5,6 @@ from typing import Optional, List, Union, Dict, Any, Type, TYPE_CHECKING, cast from uuid import UUID import urllib.parse -import contextlib import asyncio from fastapi import HTTPException, Depends @@ -30,7 +29,6 @@ if TYPE_CHECKING: from .crawlconfigs import CrawlConfigOps - from .crawlmanager import CrawlManager from .users import UserManager from .orgs import OrgOps from .colls import CollectionOps @@ -41,7 +39,7 @@ else: CrawlConfigOps = UserManager = OrgOps = CollectionOps = PageOps = object - CrawlManager = StorageOps = EventWebhookOps = BackgroundJobOps = object + StorageOps = EventWebhookOps = BackgroundJobOps = object # Presign duration must be less than 604800 seconds (one week), # so set this one minute short of a week. @@ -57,7 +55,6 @@ class BaseCrawlOps: # pylint: disable=duplicate-code, too-many-arguments, too-many-locals crawl_configs: CrawlConfigOps - crawl_manager: CrawlManager user_manager: UserManager orgs: OrgOps colls: CollectionOps @@ -66,12 +63,13 @@ class BaseCrawlOps: background_job_ops: BackgroundJobOps page_ops: PageOps + presign_duration: int + def __init__( self, mdb, users: UserManager, orgs: OrgOps, - crawl_manager: CrawlManager, crawl_configs: CrawlConfigOps, colls: CollectionOps, storage_ops: StorageOps, @@ -79,7 +77,6 @@ def __init__( background_job_ops: BackgroundJobOps, ): self.crawls = mdb["crawls"] - self.crawl_manager = crawl_manager self.crawl_configs = crawl_configs self.user_manager = users self.orgs = orgs @@ -107,7 +104,7 @@ async def get_crawl_raw( org: Optional[Organization] = None, type_: Optional[str] = None, project: Optional[dict[str, bool]] = None, - ): + ) -> Dict[str, Any]: """Get data for single crawl""" query: dict[str, object] = {"_id": crawlid} @@ -124,40 +121,61 @@ async def get_crawl_raw( return res - async def _files_to_resources(self, files, org, crawlid): + async def _files_to_resources( + self, + files: List[Dict], + org: Organization, + crawlid: str, + qa_run_id: Optional[str] = None, + ) -> List[CrawlFileOut]: if not files: return [] crawl_files = [CrawlFile(**data) for data in files] - return await self._resolve_signed_urls(crawl_files, org, crawlid) + return await self._resolve_signed_urls(crawl_files, org, crawlid, qa_run_id) + + async def get_wacz_files(self, crawl_id: str, org: Organization): + """Return list of WACZ files associated with crawl.""" + wacz_files = [] + crawl = await self.get_base_crawl(crawl_id, org) + for file_ in crawl.files: + if file_.filename.endswith(".wacz"): + wacz_files.append(file_) + return wacz_files - async def get_crawl( + async def get_base_crawl( self, crawlid: str, org: Optional[Organization] = None, type_: Optional[str] = None, - cls_type: Type[Union[CrawlOut, CrawlOutWithResources]] = CrawlOutWithResources, - ): - """Get data for single base crawl""" - res = await self.get_crawl_raw(crawlid, org, type_) - - if cls_type == CrawlOutWithResources: - res["resources"] = await self._files_to_resources( - res.get("files"), org, crawlid - ) + project: Optional[dict[str, bool]] = None, + ) -> BaseCrawl: + """Get crawl data for internal use""" + res = await self.get_crawl_raw(crawlid, org, type_, project) + return BaseCrawl.from_dict(res) - if res.get("collectionIds"): - res["collections"] = await self.colls.get_collection_names( - res.get("collectionIds") - ) + async def get_crawl_out( + self, + crawlid: str, + org: Optional[Organization] = None, + type_: Optional[str] = None, + skip_resources=False, + ) -> CrawlOutWithResources: + """Get crawl data for api output""" + res = await self.get_crawl_raw(crawlid, org, type_) - res.pop("files", None) + files = res.pop("files", None) res.pop("errors", None) - crawl = cls_type.from_dict(res) + if not skip_resources: + coll_ids = res.get("collectionIds") + if coll_ids: + res["collections"] = await self.colls.get_collection_names(coll_ids) + + crawl = CrawlOutWithResources.from_dict(res) - if crawl.type == "crawl": - crawl = await self._resolve_crawl_refs(crawl, org) + if not skip_resources: + crawl = await self._resolve_crawl_refs(crawl, org, files) if crawl.config and crawl.config.seeds: crawl.config.seeds = None @@ -168,23 +186,22 @@ async def get_crawl( return crawl - async def get_resource_resolved_raw_crawl( - self, crawlid: str, org: Organization, type_=None - ): - """return single base crawl with resources resolved""" - res = await self.get_crawl_raw(crawlid=crawlid, type_=type_, org=org) - res["resources"] = await self._files_to_resources( - res.get("files"), org, res["_id"] - ) - return res + async def get_internal_crawl_out(self, crawl_id): + """add internal prefix for relative paths""" + crawl_out = await self.get_crawl_out(crawl_id) + resources = crawl_out.resources or [] + for file_ in resources: + file_.path = self.storage_ops.resolve_internal_access_path(file_.path) + + return crawl_out async def _update_crawl_collections( self, crawl_id: str, org: Organization, collection_ids: List[UUID] ): """Update crawl collections to match updated list.""" - crawl = await self.get_crawl(crawl_id, org, cls_type=CrawlOut) + crawl = await self.get_crawl_out(crawl_id, org, skip_resources=True) - prior_coll_ids = set(crawl.collectionIds) + prior_coll_ids = set(crawl.collectionIds or []) updated_coll_ids = set(collection_ids) # Add new collections @@ -264,50 +281,7 @@ async def add_crawl_file_replica( ) async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool): - """stop or cancel specified crawl""" - crawl = await self.get_crawl_raw(crawl_id, org) - if crawl.get("type") != "crawl": - return - - result = None - try: - result = await self.crawl_manager.shutdown_crawl( - crawl_id, graceful=graceful - ) - - if result.get("success"): - if graceful: - await self.crawls.find_one_and_update( - {"_id": crawl_id, "type": "crawl", "oid": org.id}, - {"$set": {"stopping": True}}, - ) - return result - - except Exception as exc: - # pylint: disable=raise-missing-from - # if reached here, probably crawl doesn't exist anymore - raise HTTPException( - status_code=404, detail=f"crawl_not_found, (details: {exc})" - ) - - # if job no longer running, canceling is considered success, - # but graceful stoppage is not possible, so would be a failure - if result.get("error") == "Not Found": - if not graceful: - await self.update_crawl_state(crawl_id, "canceled") - crawl = await self.get_crawl_raw(crawl_id, org) - if not await self.crawl_configs.stats_recompute_last( - crawl["cid"], 0, -1 - ): - raise HTTPException( - status_code=404, - detail=f"crawl_config_not_found: {crawl['cid']}", - ) - - return {"success": True} - - # return whatever detail may be included in the response - raise HTTPException(status_code=400, detail=result) + """placeholder, implemented in crawls, base version does nothing""" async def delete_crawls( self, @@ -315,24 +289,24 @@ async def delete_crawls( delete_list: DeleteCrawlList, type_: str, user: Optional[User] = None, - ): + ) -> tuple[int, dict[UUID, dict[str, int]], bool]: """Delete a list of crawls by id for given org""" - cids_to_update: dict[str, dict[str, int]] = {} + cids_to_update: dict[UUID, dict[str, int]] = {} size = 0 for crawl_id in delete_list.crawl_ids: - crawl = await self.get_crawl_raw(crawl_id, org) - if crawl.get("type") != type_: + crawl = await self.get_base_crawl(crawl_id, org) + if crawl.type != type_: continue # Ensure user has appropriate permissions for all crawls in list: # - Crawler users can delete their own crawls # - Org owners can delete any crawls in org - if user and (crawl.get("userid") != user.id) and not org.is_owner(user): + if user and (crawl.userid != user.id) and not org.is_owner(user): raise HTTPException(status_code=403, detail="not_allowed") - if type_ == "crawl" and not crawl.get("finished"): + if type_ == "crawl" and not crawl.finished: try: await self.shutdown_crawl(crawl_id, org, graceful=False) except Exception as exc: @@ -347,7 +321,7 @@ async def delete_crawls( crawl_size = await self._delete_crawl_files(crawl, org) size += crawl_size - cid = crawl.get("cid") + cid = crawl.cid if cid: if cids_to_update.get(cid): cids_to_update[cid]["inc"] += 1 @@ -377,9 +351,8 @@ async def delete_crawls( return res.deleted_count, cids_to_update, quota_reached - async def _delete_crawl_files(self, crawl_raw: Dict[str, Any], org: Organization): + async def _delete_crawl_files(self, crawl: BaseCrawl, org: Organization): """Delete files associated with crawl from storage.""" - crawl = BaseCrawl.from_dict(crawl_raw) size = 0 for file_ in crawl.files: size += file_.size @@ -393,16 +366,16 @@ async def _delete_crawl_files(self, crawl_raw: Dict[str, Any], org: Organization async def delete_crawl_files(self, crawl_id: str, oid: UUID): """Delete crawl files""" - crawl_raw = await self.get_crawl_raw(crawl_id) + crawl = await self.get_base_crawl(crawl_id) org = await self.orgs.get_org_by_id(oid) - return await self._delete_crawl_files(crawl_raw, org) + return await self._delete_crawl_files(crawl, org) async def _resolve_crawl_refs( self, crawl: Union[CrawlOut, CrawlOutWithResources], org: Optional[Organization], + files: Optional[list[dict]], add_first_seed: bool = True, - files: Optional[list[dict]] = None, ): """Resolve running crawl data""" # pylint: disable=too-many-branches @@ -411,6 +384,12 @@ async def _resolve_crawl_refs( config = await self.crawl_configs.get_crawl_config( crawl.cid, org.id if org else None, active_only=False ) + + if not org: + org = await self.orgs.get_org_by_id(crawl.oid) + if not org: + raise HTTPException(status_code=400, detail="missing_org") + if config and config.config.seeds: if add_first_seed: first_seed = config.config.seeds[0] @@ -432,11 +411,15 @@ async def _resolve_crawl_refs( return crawl async def _resolve_signed_urls( - self, files: List[CrawlFile], org: Organization, crawl_id: Optional[str] = None - ): + self, + files: List[CrawlFile], + org: Organization, + crawl_id: Optional[str] = None, + qa_run_id: Optional[str] = None, + ) -> List[CrawlFileOut]: if not files: print("no files") - return + return [] delta = timedelta(seconds=self.presign_duration_seconds) @@ -451,12 +434,17 @@ async def _resolve_signed_urls( presigned_url = await self.storage_ops.get_presigned_url( org, file_, self.presign_duration_seconds ) + + prefix = "files" + if qa_run_id: + prefix = f"qaFinished.{qa_run_id}.{prefix}" + await self.crawls.find_one_and_update( - {"files.filename": file_.filename}, + {f"{prefix}.filename": file_.filename}, { "$set": { - "files.$.presignedUrl": presigned_url, - "files.$.expireAt": exp, + f"{prefix}.$.presignedUrl": presigned_url, + f"{prefix}.$.expireAt": exp, } }, ) @@ -481,25 +469,13 @@ async def _resolve_signed_urls( return out_files - @contextlib.asynccontextmanager - async def get_redis(self, crawl_id): - """get redis url for crawl id""" - redis_url = self.crawl_manager.get_redis_url(crawl_id) - - redis = await self.crawl_manager.get_redis_client(redis_url) - - try: - yield redis - finally: - await redis.close() - async def add_to_collection( self, crawl_ids: List[str], collection_id: UUID, org: Organization ): """Add crawls to collection.""" for crawl_id in crawl_ids: - crawl_raw = await self.get_crawl_raw(crawl_id, org) - crawl_collections = crawl_raw.get("collectionIds") + crawl = await self.get_base_crawl(crawl_id, org) + crawl_collections = crawl.collectionIds if crawl_collections and crawl_id in crawl_collections: raise HTTPException( status_code=400, detail="crawl_already_in_collection" @@ -649,11 +625,10 @@ async def delete_crawls_all_types( uploads: list[str] = [] for crawl_id in delete_list.crawl_ids: - crawl = await self.get_crawl_raw(crawl_id, org) - type_ = crawl.get("type") - if type_ == "crawl": + crawl = await self.get_base_crawl(crawl_id, org) + if crawl.type == "crawl": crawls.append(crawl_id) - if type_ == "upload": + if crawl.type == "upload": uploads.append(crawl_id) crawls_length = len(crawls) @@ -804,7 +779,7 @@ async def get_all_crawls_search_values( response_model=CrawlOutWithResources, ) async def get_base_crawl(crawl_id: str, org: Organization = Depends(org_crawl_dep)): - return await ops.get_crawl(crawl_id, org) + return await ops.get_crawl_out(crawl_id, org) @app.get( "/orgs/all/all-crawls/{crawl_id}/replay.json", @@ -815,15 +790,15 @@ async def get_base_crawl_admin(crawl_id, user: User = Depends(user_dep)): if not user.is_superuser: raise HTTPException(status_code=403, detail="Not Allowed") - return await ops.get_crawl(crawl_id, None) + return await ops.get_crawl_out(crawl_id, None) @app.get( "/orgs/{oid}/all-crawls/{crawl_id}/replay.json", tags=["all-crawls"], response_model=CrawlOutWithResources, ) - async def get_crawl(crawl_id, org: Organization = Depends(org_viewer_dep)): - return await ops.get_crawl(crawl_id, org) + async def get_crawl_out(crawl_id, org: Organization = Depends(org_viewer_dep)): + return await ops.get_crawl_out(crawl_id, org) @app.patch("/orgs/{oid}/all-crawls/{crawl_id}", tags=["all-crawls"]) async def update_crawl( diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index 497a9cc09a..6a23fe50d2 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -20,6 +20,7 @@ CollIdName, UpdateColl, AddRemoveCrawlList, + BaseCrawl, CrawlOutWithResources, Organization, PaginatedResponse, @@ -330,17 +331,18 @@ async def update_collection_counts_and_tags(self, collection_id: UUID): total_size = 0 tags = [] - async for crawl in self.crawls.find({"collectionIds": collection_id}): - if crawl["state"] not in SUCCESSFUL_STATES: + async for crawl_raw in self.crawls.find({"collectionIds": collection_id}): + crawl = BaseCrawl.from_dict(crawl_raw) + if crawl.state not in SUCCESSFUL_STATES: continue crawl_count += 1 - files = crawl.get("files", []) + files = crawl.files or [] for file in files: - total_size += file.get("size", 0) - if crawl.get("stats"): - page_count += crawl.get("stats", {}).get("done", 0) - if crawl.get("tags"): - tags.extend(crawl.get("tags")) + total_size += file.size + if crawl.stats: + page_count += crawl.stats.done + if crawl.tags: + tags.extend(crawl.tags) sorted_tags = [tag for tag, count in Counter(tags).most_common()] diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 9690ebe55f..3fb1d09700 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -24,6 +24,7 @@ CrawlConfig, CrawlConfigOut, CrawlConfigIdNameOut, + CrawlOut, EmptyStr, UpdateCrawlConfig, Organization, @@ -559,7 +560,9 @@ async def get_crawl_config_ids_for_profile( results = [CrawlConfigIdNameOut.from_dict(res) for res in results] return results - async def get_running_crawl(self, crawlconfig: CrawlConfig): + async def get_running_crawl( + self, crawlconfig: Union[CrawlConfig, CrawlConfigOut] + ) -> Optional[CrawlOut]: """Return the id of currently running crawl for this config, if any""" # crawls = await self.crawl_manager.list_running_crawls(cid=crawlconfig.id) crawls, _ = await self.crawl_ops.list_crawls( @@ -619,13 +622,15 @@ async def stats_recompute_last(self, cid: UUID, size: int, inc_crawls: int = 1): return result is not None - def _add_curr_crawl_stats(self, crawlconfig, crawl): + def _add_curr_crawl_stats( + self, crawlconfig: CrawlConfigOut, crawl: Optional[CrawlOut] + ): """Add stats from current running crawl, if any""" if not crawl: return crawlconfig.lastCrawlState = crawl.state - crawlconfig.lastCrawlSize = crawl.stats.get("size", 0) if crawl.stats else 0 + crawlconfig.lastCrawlSize = crawl.stats.size if crawl.stats else 0 crawlconfig.lastCrawlStopping = crawl.stopping async def get_crawl_config_out(self, cid: UUID, org: Organization): @@ -814,8 +819,9 @@ async def get_crawl_config_search_values(self, org): "workflowIds": workflow_ids, } - async def run_now(self, cid: UUID, org: Organization, user: User): - """run specified crawlconfig now""" + async def prepare_for_run_crawl(self, cid: UUID, org: Organization) -> CrawlConfig: + """prepare for running a crawl, returning crawlconfig and + validating that running crawls is allowed""" crawlconfig = await self.get_crawl_config(cid, org.id) if not crawlconfig: @@ -823,11 +829,6 @@ async def run_now(self, cid: UUID, org: Organization, user: User): status_code=404, detail=f"Crawl Config '{cid}' not found" ) - if await self.get_running_crawl(crawlconfig): - raise HTTPException(status_code=400, detail="crawl_already_running") - - crawl_id = None - # ensure crawlconfig exists try: await self.crawl_manager.get_configmap(crawlconfig.id) @@ -841,6 +842,15 @@ async def run_now(self, cid: UUID, org: Organization, user: User): if await self.org_ops.exec_mins_quota_reached(org.id): raise HTTPException(status_code=403, detail="exec_minutes_quota_reached") + return crawlconfig + + async def run_now(self, cid: UUID, org: Organization, user: User): + """run specified crawlconfig now""" + crawlconfig = await self.prepare_for_run_crawl(cid, org) + + if await self.get_running_crawl(crawlconfig): + raise HTTPException(status_code=400, detail="crawl_already_running") + try: crawl_id = await self.crawl_manager.create_crawl_job( crawlconfig, diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index a8fcfd8344..536da58274 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -177,6 +177,37 @@ async def create_crawl_job( warc_prefix=warc_prefix, ) + async def create_qa_crawl_job( + self, + crawlconfig: CrawlConfig, + storage: StorageRef, + userid: str, + qa_source: str, + ) -> str: + """create new QA Run crawl job with qa source crawl id""" + cid = str(crawlconfig.id) + + storage_secret = storage.get_storage_secret_name(str(crawlconfig.oid)) + + await self.has_storage_secret(storage_secret) + + ts_now = dt_now().strftime("%Y%m%d%H%M%S") + crawl_id = f"qa-{ts_now}-{cid[:12]}" + + return await self.new_crawl_job( + cid, + userid, + crawlconfig.oid, + storage, + crawlconfig.crawlerChannel, + 1, + 0, + 0, + warc_prefix="qa", + crawl_id=crawl_id, + qa_source=qa_source, + ) + async def update_crawl_config( self, crawlconfig: CrawlConfig, update: UpdateCrawlConfig, profile_filename=None ) -> bool: diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 025ab70d86..4711bec709 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -4,10 +4,12 @@ import json import re +import contextlib import urllib.parse +from datetime import datetime from uuid import UUID -from typing import Optional, List, Dict, Union +from typing import Optional, List, Dict, Union, Any from fastapi import Depends, HTTPException from fastapi.responses import StreamingResponse @@ -17,19 +19,27 @@ from .pagination import DEFAULT_PAGE_SIZE, paginated_format from .utils import dt_now, parse_jsonl_error_messages, stream_dict_list_as_csv from .basecrawls import BaseCrawlOps +from .crawlmanager import CrawlManager from .models import ( UpdateCrawl, DeleteCrawlList, CrawlConfig, UpdateCrawlConfig, CrawlScale, + CrawlStats, + CrawlFile, Crawl, CrawlOut, CrawlOutWithResources, + QARun, + QARunOut, + QARunWithResources, + DeleteQARunList, Organization, User, PaginatedResponse, RUNNING_AND_STARTING_STATES, + SUCCESSFUL_STATES, ALL_CRAWL_STATES, ) @@ -39,13 +49,15 @@ # ============================================================================ +# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods class CrawlOps(BaseCrawlOps): """Crawl Ops""" - # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods + crawl_manager: CrawlManager - def __init__(self, *args): + def __init__(self, crawl_manager: CrawlManager, *args): super().__init__(*args) + self.crawl_manager = crawl_manager self.crawl_configs.set_crawl_ops(self) self.colls.set_crawl_ops(self) self.event_webhook_ops.set_crawl_ops(self) @@ -76,6 +88,28 @@ async def init_index(self): await self.crawls.create_index([("state", pymongo.HASHED)]) await self.crawls.create_index([("fileSize", pymongo.DESCENDING)]) + async def get_crawl( + self, + crawlid: str, + org: Optional[Organization] = None, + project: Optional[dict[str, bool]] = None, + ) -> Crawl: + """Get crawl data for internal use""" + res = await self.get_crawl_raw(crawlid, org, "crawl", project) + return Crawl.from_dict(res) + + @contextlib.asynccontextmanager + async def get_redis(self, crawl_id): + """get redis url for crawl id""" + redis_url = self.crawl_manager.get_redis_url(crawl_id) + + redis = await self.crawl_manager.get_redis_client(redis_url) + + try: + yield redis + finally: + await redis.close() + async def list_crawls( self, org: Optional[Organization] = None, @@ -193,7 +227,7 @@ async def list_crawls( crawl = cls.from_dict(result) files = result.get("files") if resources else None crawl = await self._resolve_crawl_refs( - crawl, org, add_first_seed=False, files=files + crawl, org, files=files, add_first_seed=False ) crawls.append(crawl) @@ -268,16 +302,15 @@ async def add_new_crawl( return dt_now except pymongo.errors.DuplicateKeyError: - # print(f"Crawl Already Added: {crawl.id} - {crawl.state}") return None async def update_crawl_scale( self, crawl_id: str, org: Organization, crawl_scale: CrawlScale, user: User ): """Update crawl scale in the db""" - crawl = await self.get_crawl_raw(crawl_id, org) + crawl = await self.get_crawl(crawl_id, org) update = UpdateCrawlConfig(scale=crawl_scale.scale) - await self.crawl_configs.update_crawl_config(crawl["cid"], org, user, update) + await self.crawl_configs.update_crawl_config(crawl.cid, org, user, update) result = await self.crawls.find_one_and_update( {"_id": crawl_id, "type": "crawl", "oid": org.id}, @@ -378,11 +411,11 @@ async def add_or_remove_exclusion(self, crawl_id, regex, org, user, add): """add new exclusion to config or remove exclusion from config for given crawl_id, update config on crawl""" - crawl_raw = await self.get_crawl_raw(crawl_id, org, project={"cid": True}) + crawl = await self.get_crawl(crawl_id, org, project={"cid": True}) - cid = crawl_raw.get("cid") + cid = crawl.cid - scale = crawl_raw.get("scale", 1) + scale = crawl.scale or 1 async with self.get_redis(crawl_id) as redis: query = { @@ -406,27 +439,64 @@ async def add_or_remove_exclusion(self, crawl_id, regex, org, user, add): return {"success": True} async def update_crawl_state_if_allowed( - self, crawl_id, state, allowed_from, **kwargs + self, + crawl_id: str, + is_qa: bool, + state: str, + allowed_from: List[str], + finished: Optional[datetime] = None, + stats: Optional[CrawlStats] = None, ): """update crawl state and other properties in db if state has changed""" - kwargs["state"] = state - query = {"_id": crawl_id, "type": "crawl"} + prefix = "" if not is_qa else "qa." + + update: Dict[str, Any] = {f"{prefix}state": state} + if finished: + update[f"{prefix}finished"] = finished + if stats: + update[f"{prefix}stats"] = stats.dict() + + query: Dict[str, Any] = {"_id": crawl_id, "type": "crawl"} if allowed_from: - query["state"] = {"$in": allowed_from} + query[f"{prefix}state"] = {"$in": allowed_from} - return await self.crawls.find_one_and_update(query, {"$set": kwargs}) + return await self.crawls.find_one_and_update(query, {"$set": update}) - async def update_running_crawl_stats(self, crawl_id, stats): + async def update_running_crawl_stats( + self, crawl_id: str, is_qa: bool, stats: CrawlStats + ): """update running crawl stats""" - query = {"_id": crawl_id, "type": "crawl", "state": "running"} - return await self.crawls.find_one_and_update(query, {"$set": {"stats": stats}}) + prefix = "" if not is_qa else "qa." + query = {"_id": crawl_id, "type": "crawl", f"{prefix}state": "running"} + return await self.crawls.find_one_and_update( + query, {"$set": {f"{prefix}stats": stats.dict()}} + ) - async def inc_crawl_exec_time(self, crawl_id, exec_time, last_updated_time): + async def inc_crawl_exec_time( + self, + crawl_id: str, + is_qa: bool, + exec_time, + last_updated_time, + ): """increment exec time""" + # update both crawl-shared qa exec seconds and per-qa run exec seconds + if is_qa: + inc_update = { + "qaCrawlExecSeconds": exec_time, + "qa.crawlExecSeconds": exec_time, + } + else: + inc_update = {"crawlExecSeconds": exec_time} + return await self.crawls.find_one_and_update( - {"_id": crawl_id, "type": "crawl", "_lut": {"$ne": last_updated_time}}, { - "$inc": {"crawlExecSeconds": exec_time}, + "_id": crawl_id, + "type": "crawl", + "_lut": {"$ne": last_updated_time}, + }, + { + "$inc": inc_update, "$set": {"_lut": last_updated_time}, }, ) @@ -438,28 +508,42 @@ async def get_crawl_exec_last_update_time(self, crawl_id): ) return res and res.get("_lut") - async def get_crawl_state(self, crawl_id): + async def get_crawl_state(self, crawl_id: str, is_qa: bool): """return current crawl state of a crawl""" + prefix = "" if not is_qa else "qa." + res = await self.crawls.find_one( - {"_id": crawl_id}, projection=["state", "finished"] + {"_id": crawl_id}, + projection={"state": f"${prefix}state", "finished": f"${prefix}finished"}, ) if not res: return None, None return res.get("state"), res.get("finished") - async def add_crawl_error(self, crawl_id: str, error: str): + async def add_crawl_error( + self, + crawl_id: str, + is_qa: bool, + error: str, + ): """add crawl error from redis to mongodb errors field""" + prefix = "" if not is_qa else "qa." + await self.crawls.find_one_and_update( - {"_id": crawl_id}, {"$push": {"errors": error}} + {"_id": crawl_id}, {"$push": {f"{prefix}errors": error}} ) - async def add_crawl_file(self, crawl_id, crawl_file, size): + async def add_crawl_file( + self, crawl_id: str, is_qa: bool, crawl_file: CrawlFile, size: int + ): """add new crawl file to crawl""" + prefix = "" if not is_qa else "qa." + await self.crawls.find_one_and_update( {"_id": crawl_id}, { - "$push": {"files": crawl_file.dict()}, - "$inc": {"fileCount": 1, "fileSize": size}, + "$push": {f"{prefix}files": crawl_file.dict()}, + "$inc": {f"{prefix}fileCount": 1, f"{prefix}fileSize": size}, }, ) @@ -474,9 +558,10 @@ async def get_crawl_seeds( skip = (page - 1) * page_size upper_bound = skip + page_size - crawl_raw = await self.get_crawl_raw(crawl_id, org) + crawl = await self.get_crawl(crawl_id, org) + if not crawl.config or not crawl.config.seeds: + return [], 0 try: - crawl = Crawl.from_dict(crawl_raw) return crawl.config.seeds[skip:upper_bound], len(crawl.config.seeds) # pylint: disable=broad-exception-caught except Exception: @@ -496,60 +581,261 @@ async def get_crawl_stats( if org: query["oid"] = org.id - async for crawl in self.crawls.find(query): + async for crawl_raw in self.crawls.find(query): + crawl = Crawl.from_dict(crawl_raw) data: Dict[str, Union[str, int]] = {} - data["id"] = str(crawl.get("_id")) - - oid = crawl.get("oid") - data["oid"] = str(oid) - data["org"] = org_slugs[oid] + data["id"] = crawl.id - data["cid"] = str(crawl.get("cid")) - crawl_name = crawl.get("name") - data["name"] = f'"{crawl_name}"' if crawl_name else "" - data["state"] = crawl.get("state") + data["oid"] = str(crawl.oid) + data["org"] = org_slugs[crawl.oid] - userid = crawl.get("userid") - data["userid"] = str(userid) - data["user"] = user_emails.get(userid) + data["cid"] = crawl.id + data["name"] = f'"{crawl.name}"' if crawl.name else "" + data["state"] = crawl.state - started = crawl.get("started") - finished = crawl.get("finished") + data["userid"] = str(crawl.userid) + data["user"] = user_emails.get(crawl.userid) - data["started"] = str(started) - data["finished"] = str(finished) + data["started"] = str(crawl.started) + data["finished"] = str(crawl.finished) data["duration"] = 0 - if started and finished: - duration = finished - started + duration_seconds = 0 + if crawl.started and crawl.finished: + duration = crawl.finished - crawl.started duration_seconds = int(duration.total_seconds()) if duration_seconds: data["duration"] = duration_seconds - done_stats = None - if crawl.get("stats") and crawl.get("stats").get("done"): - done_stats = crawl["stats"]["done"] + if crawl.stats: + data["pages"] = crawl.stats.done - data["pages"] = 0 - if done_stats: - data["pages"] = done_stats - - data["filesize"] = crawl.get("fileSize", 0) + data["filesize"] = crawl.fileSize data["avg_page_time"] = 0 - if ( - done_stats - and done_stats != 0 - and started - and finished - and duration_seconds - ): - data["avg_page_time"] = int(duration_seconds / done_stats) + if crawl.stats and crawl.stats.done != 0 and duration_seconds: + data["avg_page_time"] = int(duration_seconds / crawl.stats.done) crawls_data.append(data) return crawls_data + async def shutdown_crawl( + self, crawl_id: str, org: Organization, graceful: bool + ) -> Dict[str, bool]: + """stop or cancel specified crawl""" + crawl = await self.get_base_crawl(crawl_id, org) + if crawl and crawl.type != "crawl": + raise HTTPException(status_code=400, detail="not_a_crawl") + + result = None + try: + result = await self.crawl_manager.shutdown_crawl( + crawl_id, graceful=graceful + ) + + if result.get("success"): + if graceful: + await self.crawls.find_one_and_update( + {"_id": crawl_id, "type": "crawl", "oid": org.id}, + {"$set": {"stopping": True}}, + ) + return result + + except Exception as exc: + # pylint: disable=raise-missing-from + # if reached here, probably crawl doesn't exist anymore + raise HTTPException( + status_code=404, detail=f"crawl_not_found, (details: {exc})" + ) + + # if job no longer running, canceling is considered success, + # but graceful stoppage is not possible, so would be a failure + if result.get("error") == "Not Found": + if not graceful: + await self.update_crawl_state(crawl_id, "canceled") + crawl = await self.get_crawl(crawl_id, org) + if not await self.crawl_configs.stats_recompute_last(crawl.cid, 0, -1): + raise HTTPException( + status_code=404, + detail=f"crawl_config_not_found: {crawl.cid}", + ) + + return {"success": True} + + # return whatever detail may be included in the response + raise HTTPException(status_code=400, detail=result) + + async def start_crawl_qa_run(self, crawl_id: str, org: Organization, user: User): + """Start crawl QA run""" + + crawl = await self.get_crawl(crawl_id, org) + + # can only QA finished crawls + if not crawl.finished: + raise HTTPException(status_code=400, detail="crawl_not_finished") + + # can only QA successfully finished crawls + if crawl.state not in SUCCESSFUL_STATES: + raise HTTPException(status_code=400, detail="crawl_did_not_succeed") + + # can only run one QA at a time + if crawl.qa: + raise HTTPException(status_code=400, detail="qa_already_running") + + # not a valid crawl + if not crawl.cid or crawl.type != "crawl": + raise HTTPException(status_code=400, detail="invalid_crawl_for_qa") + + crawlconfig = await self.crawl_configs.prepare_for_run_crawl(crawl.cid, org) + + try: + qa_run_id = await self.crawl_manager.create_qa_crawl_job( + crawlconfig, + org.storage, + userid=str(user.id), + qa_source=crawl_id, + ) + + image = self.crawl_configs.get_channel_crawler_image( + crawlconfig.crawlerChannel + ) + + qa_run = QARun( + id=qa_run_id, + started=datetime.now(), + userid=user.id, + userName=user.name, + state="starting", + image=image, + ) + + await self.crawls.find_one_and_update( + {"_id": crawl_id}, + { + "$set": { + "qa": qa_run.dict(), + } + }, + ) + + return qa_run_id + + except Exception as exc: + # pylint: disable=raise-missing-from + raise HTTPException(status_code=500, detail=f"Error starting crawl: {exc}") + + async def stop_crawl_qa_run(self, crawl_id: str, org: Organization): + """Stop crawl QA run, QA run removed when actually finished""" + crawl = await self.get_crawl(crawl_id, org) + + if not crawl.qa: + raise HTTPException(status_code=400, detail="qa_not_running") + + try: + result = await self.crawl_manager.shutdown_crawl(crawl.qa.id, graceful=True) + + if result.get("error") == "Not Found": + # treat as success, qa crawl no longer exists, so mark as no qa + result = {"success": True} + + return result + + except Exception as exc: + # pylint: disable=raise-missing-from + # if reached here, probably crawl doesn't exist anymore + raise HTTPException( + status_code=404, detail=f"crawl_not_found, (details: {exc})" + ) + + async def delete_crawl_qa_runs(self, crawl_id: str, delete_list: DeleteQARunList): + """delete specified finished QA run""" + + count = 0 + for qa_run_id in delete_list.qa_run_ids: + res = await self.crawls.find_one_and_update( + {"_id": crawl_id, "type": "crawl"}, + {"$unset": {f"qaFinished.{qa_run_id}": ""}}, + ) + + if res: + count += 1 + + await self.page_ops.delete_qa_run_from_pages(crawl_id, qa_run_id) + + return {"deleted": count} + + async def qa_run_finished(self, crawl_id: str): + """clear active qa, add qa run to finished list, if successful""" + crawl = await self.get_crawl(crawl_id) + + if not crawl.qa: + return False + + query: Dict[str, Any] = {"qa": None} + + if crawl.qa.finished and crawl.qa.state in SUCCESSFUL_STATES: + query[f"qaFinished.{crawl.qa.id}"] = crawl.qa.dict() + + if await self.crawls.find_one_and_update( + {"_id": crawl_id, "type": "crawl"}, {"$set": query} + ): + return True + + return False + + async def get_qa_runs( + self, crawl_id: str, org: Optional[Organization] = None + ) -> List[QARunOut]: + """Return list of QA runs""" + crawl_data = await self.get_crawl_raw( + crawl_id, org, "crawl", project={"qaFinished": True, "qa": True} + ) + qa_finished = crawl_data.get("qaFinished") or {} + all_qa = [QARunOut(**qa_run_data) for qa_run_data in qa_finished.values()] + all_qa.sort(key=lambda x: x.finished or dt_now(), reverse=True) + qa = crawl_data.get("qa") + if qa: + all_qa.insert(0, QARunOut(**qa)) + return all_qa + + async def get_active_qa( + self, crawl_id: str, org: Optional[Organization] = None + ) -> Optional[QARunOut]: + """return just the active QA, if any""" + crawl_data = await self.get_crawl_raw( + crawl_id, org, "crawl", project={"qa": True} + ) + qa = crawl_data.get("qa") + return QARunOut(**qa) if qa else None + + async def get_qa_run_for_replay( + self, crawl_id: str, qa_run_id: str, org: Optional[Organization] = None + ) -> QARunWithResources: + """Fetch QA runs with resources for replay.json""" + crawl = await self.get_crawl(crawl_id, org) + qa_finished = crawl.qaFinished or {} + qa_run = qa_finished.get(qa_run_id) + + if not qa_run: + raise HTTPException(status_code=404, detail="crawl_qa_not_found") + + if not org: + org = await self.orgs.get_org_by_id(crawl.oid) + if not org: + raise HTTPException(status_code=400, detail="missing_org") + + resources = await self._resolve_signed_urls( + qa_run.files, org, crawl.id, qa_run_id + ) + + qa_run.files = [] + + qa_run_dict = qa_run.dict() + qa_run_dict["resources"] = resources + + return QARunWithResources(**qa_run_dict) + # ============================================================================ async def recompute_crawl_file_count_and_size(crawls, crawl_id): @@ -571,11 +857,11 @@ async def recompute_crawl_file_count_and_size(crawls, crawl_id): # ============================================================================ # pylint: disable=too-many-arguments, too-many-locals, too-many-statements -def init_crawls_api(app, user_dep, *args): +def init_crawls_api(crawl_manager: CrawlManager, app, user_dep, *args): """API for crawl management, including crawl done callback""" # pylint: disable=invalid-name, duplicate-code - ops = CrawlOps(*args) + ops = CrawlOps(crawl_manager, *args) org_viewer_dep = ops.orgs.org_viewer_dep org_crawl_dep = ops.orgs.org_crawl_dep @@ -725,15 +1011,81 @@ async def get_crawl_admin(crawl_id, user: User = Depends(user_dep)): if not user.is_superuser: raise HTTPException(status_code=403, detail="Not Allowed") - return await ops.get_crawl(crawl_id, None, "crawl") + return await ops.get_crawl_out(crawl_id, None, "crawl") @app.get( "/orgs/{oid}/crawls/{crawl_id}/replay.json", tags=["crawls"], response_model=CrawlOutWithResources, ) - async def get_crawl(crawl_id, org: Organization = Depends(org_viewer_dep)): - return await ops.get_crawl(crawl_id, org, "crawl") + async def get_crawl_out(crawl_id, org: Organization = Depends(org_viewer_dep)): + return await ops.get_crawl_out(crawl_id, org, "crawl") + + # QA APIs + # --------------------- + @app.get( + "/orgs/all/crawls/{crawl_id}/qa/{qa_run_id}/replay.json", + tags=["qa"], + response_model=QARunWithResources, + ) + async def get_qa_run_admin(crawl_id, qa_run_id, user: User = Depends(user_dep)): + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + return await ops.get_qa_run_for_replay(crawl_id, qa_run_id) + + @app.get( + "/orgs/{oid}/crawls/{crawl_id}/qa/{qa_run_id}/replay.json", + tags=["qa"], + response_model=QARunWithResources, + ) + async def get_qa_run( + crawl_id, qa_run_id, org: Organization = Depends(org_viewer_dep) + ): + return await ops.get_qa_run_for_replay(crawl_id, qa_run_id, org) + + @app.post("/orgs/{oid}/crawls/{crawl_id}/qa/start", tags=["qa"]) + async def start_crawl_qa_run( + crawl_id: str, + org: Organization = Depends(org_crawl_dep), + user: User = Depends(user_dep), + ): + qa_run_id = await ops.start_crawl_qa_run(crawl_id, org, user) + return {"started": qa_run_id} + + @app.post("/orgs/{oid}/crawls/{crawl_id}/qa/stop", tags=["qa"]) + async def stop_crawl_qa_run( + crawl_id: str, org: Organization = Depends(org_crawl_dep) + ): + # pylint: disable=unused-argument + return await ops.stop_crawl_qa_run(crawl_id, org) + + @app.post("/orgs/{oid}/crawls/{crawl_id}/qa/delete", tags=["qa"]) + async def delete_crawl_qa_runs( + crawl_id: str, + qa_run_ids: DeleteQARunList, + org: Organization = Depends(org_crawl_dep), + ): + # pylint: disable=unused-argument + return await ops.delete_crawl_qa_runs(crawl_id, qa_run_ids) + + @app.get( + "/orgs/{oid}/crawls/{crawl_id}/qa", + tags=["qa"], + response_model=List[QARunOut], + ) + async def get_qa_runs(crawl_id, org: Organization = Depends(org_viewer_dep)): + return await ops.get_qa_runs(crawl_id, org) + + @app.get( + "/orgs/{oid}/crawls/{crawl_id}/qa/activeQA", + tags=["qa"], + response_model=Dict[str, Optional[QARunOut]], + ) + async def get_active_qa(crawl_id, org: Organization = Depends(org_viewer_dep)): + return {"qa": await ops.get_active_qa(crawl_id, org)} + + # ---- @app.get( "/orgs/all/crawls/{crawl_id}", @@ -870,7 +1222,7 @@ async def stream_crawl_logs( logLevel: Optional[str] = None, context: Optional[str] = None, ): - crawl = await ops.get_crawl(crawl_id, org, "crawl") + crawl = await ops.get_crawl_out(crawl_id, org) log_levels = [] contexts = [] @@ -882,7 +1234,7 @@ async def stream_crawl_logs( # If crawl is finished, stream logs from WACZ files using presigned urls if crawl.finished: resp = await ops.storage_ops.sync_stream_wacz_logs( - crawl.resources, log_levels, contexts + crawl.resources or [], log_levels, contexts ) return StreamingResponse( resp, @@ -904,14 +1256,13 @@ async def get_crawl_errors( page: int = 1, org: Organization = Depends(org_viewer_dep), ): - crawl_raw = await ops.get_crawl_raw(crawl_id, org) - crawl = Crawl.from_dict(crawl_raw) + crawl = await ops.get_crawl(crawl_id, org) skip = (page - 1) * pageSize upper_bound = skip + pageSize - errors = crawl.errors[skip:upper_bound] + errors = crawl.errors[skip:upper_bound] if crawl.errors else [] parsed_errors = parse_jsonl_error_messages(errors) - return paginated_format(parsed_errors, len(crawl.errors), page, pageSize) + return paginated_format(parsed_errors, len(crawl.errors or []), page, pageSize) return ops diff --git a/backend/btrixcloud/db.py b/backend/btrixcloud/db.py index e9064a47f8..93207ea12a 100644 --- a/backend/btrixcloud/db.py +++ b/backend/btrixcloud/db.py @@ -8,7 +8,7 @@ import asyncio from uuid import UUID -from typing import Optional, Union +from typing import Optional, Union, TypeVar, Type import motor.motor_asyncio from pydantic import BaseModel @@ -96,7 +96,13 @@ async def update_and_prepare_db( if await run_db_migrations(mdb, user_manager, page_ops): await drop_indexes(mdb) await create_indexes( - org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops, user_manager + org_ops, + crawl_ops, + crawl_config_ops, + coll_ops, + invite_ops, + user_manager, + page_ops, ) await user_manager.create_super_user() await org_ops.create_default_org() @@ -186,7 +192,7 @@ async def drop_indexes(mdb): # ============================================================================ # pylint: disable=too-many-arguments async def create_indexes( - org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops, user_manager + org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops, user_manager, page_ops ): """Create database indexes.""" print("Creating database indexes", flush=True) @@ -196,6 +202,11 @@ async def create_indexes( await coll_ops.init_index() await invite_ops.init_index() await user_manager.init_index() + await page_ops.init_index() + + +# ============================================================================ +T = TypeVar("T") # ============================================================================ @@ -210,10 +221,10 @@ def id_str(self): return str(self.id) @classmethod - def from_dict(cls, data): + def from_dict(cls: Type[T], data: dict) -> T: """convert dict from mongo to a class""" if not data: - return None + return cls() data["id"] = data.pop("_id") return cls(**data) diff --git a/backend/btrixcloud/k8sapi.py b/backend/btrixcloud/k8sapi.py index 2014961b73..578c22692e 100644 --- a/backend/btrixcloud/k8sapi.py +++ b/backend/btrixcloud/k8sapi.py @@ -30,7 +30,9 @@ def __init__(self): self.namespace = os.environ.get("CRAWLER_NAMESPACE") or "crawlers" self.custom_resources = {} - self.templates = Jinja2Templates(directory=get_templates_dir()) + self.templates = Jinja2Templates( + directory=get_templates_dir(), autoescape=False + ) config.load_incluster_config() self.client = client @@ -87,6 +89,7 @@ def new_crawl_job_yaml( manual=True, crawl_id=None, warc_prefix="", + qa_source="", ): """load job template from yaml""" if not crawl_id: @@ -106,6 +109,7 @@ def new_crawl_job_yaml( "manual": "1" if manual else "0", "crawler_channel": crawler_channel, "warc_prefix": warc_prefix, + "qa_source": qa_source, } data = self.templates.env.get_template("crawl_job.yaml").render(params) @@ -173,12 +177,14 @@ async def has_storage_secret(self, storage_secret) -> bool: async def delete_crawl_job(self, crawl_id): """delete custom crawljob object""" try: + name = f"crawljob-{crawl_id}" + await self.custom_api.delete_namespaced_custom_object( group="btrix.cloud", version="v1", namespace=self.namespace, plural="crawljobs", - name=f"crawljob-{crawl_id}", + name=name, grace_period_seconds=0, # delete as background to allow operator to do proper cleanup propagation_policy="Background", @@ -217,12 +223,14 @@ async def get_profile_browser(self, browserid): async def _patch_job(self, crawl_id, body, pluraltype="crawljobs") -> dict: try: + name = f"{pluraltype[:-1]}-{crawl_id}" + await self.custom_api.patch_namespaced_custom_object( group="btrix.cloud", version="v1", namespace=self.namespace, plural=pluraltype, - name=f"{pluraltype[:-1]}-{crawl_id}", + name=name, body={"spec": body}, _content_type="application/merge-patch+json", ) diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index e878fef612..0360777628 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -127,10 +127,10 @@ def main(): base_crawl_init = ( app, current_active_user, + # to basecrawls mdb, user_manager, org_ops, - crawl_manager, crawl_config_ops, coll_ops, storage_ops, @@ -140,7 +140,7 @@ def main(): base_crawl_ops = init_base_crawls_api(*base_crawl_init) - crawls = init_crawls_api(*base_crawl_init) + crawls = init_crawls_api(crawl_manager, *base_crawl_init) page_ops = init_pages_api( app, mdb, crawls, org_ops, storage_ops, current_active_user diff --git a/backend/btrixcloud/main_op.py b/backend/btrixcloud/main_op.py index ba6555dba2..e278a3644a 100644 --- a/backend/btrixcloud/main_op.py +++ b/backend/btrixcloud/main_op.py @@ -77,10 +77,10 @@ def main(): coll_ops = CollectionOps(mdb, crawl_manager, org_ops, event_webhook_ops) crawl_ops = CrawlOps( + crawl_manager, mdb, user_manager, org_ops, - crawl_manager, crawl_config_ops, coll_ops, storage_ops, diff --git a/backend/btrixcloud/migrations/migration_0004_config_seeds.py b/backend/btrixcloud/migrations/migration_0004_config_seeds.py index 61f599ff95..8f6e5ff6c2 100644 --- a/backend/btrixcloud/migrations/migration_0004_config_seeds.py +++ b/backend/btrixcloud/migrations/migration_0004_config_seeds.py @@ -102,12 +102,14 @@ async def migrate_up(self): # Test migration async for config_dict in crawl_configs.find({}): config = CrawlConfig.from_dict(config_dict) - for seed in config.config.seeds: + seeds = config.config.seeds or [] + for seed in seeds: assert isinstance(seed, Seed) assert seed.url async for crawl_dict in crawls.find({}): crawl = Crawl.from_dict(crawl_dict) - for seed in crawl.config.seeds: + seeds = crawl.config.seeds or [] + for seed in seeds: assert isinstance(seed, Seed) assert seed.url diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 11dd412567..2b468699dc 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -387,6 +387,8 @@ def get_raw_config(self): class CrawlConfigOut(CrawlConfigCore, CrawlConfigAdditional): """Crawl Config Output""" + id: UUID + lastCrawlStopping: Optional[bool] = False profileName: Optional[str] firstSeed: Optional[str] @@ -532,37 +534,60 @@ class ReviewStatus(str, Enum): # ============================================================================ -class BaseCrawl(BaseMongoModel): - """Base Crawl object (representing crawls, uploads and manual sessions)""" +class CrawlStats(BaseModel): + """Crawl Stats for pages and size""" - id: str + found: int = 0 + done: int = 0 + size: int = 0 - type: str + +# ============================================================================ +class CoreCrawlable(BaseModel): + # pylint: disable=too-few-public-methods + """Core properties for crawlable run (crawl or qa run)""" + + id: str userid: UUID userName: Optional[str] - oid: UUID started: datetime finished: Optional[datetime] = None - name: Optional[str] = "" - state: str - stats: Optional[Dict[str, int]] = None - - files: Optional[List[CrawlFile]] = [] + crawlExecSeconds: int = 0 - description: Optional[str] = "" + image: Optional[str] - errors: Optional[List[str]] = [] + stats: Optional[CrawlStats] = CrawlStats() - collectionIds: Optional[List[UUID]] = [] + files: List[CrawlFile] = [] fileSize: int = 0 fileCount: int = 0 + errors: Optional[List[str]] = [] + + +# ============================================================================ +class BaseCrawl(CoreCrawlable, BaseMongoModel): + """Base Crawl object (representing crawls, uploads and manual sessions)""" + + type: str + + oid: UUID + cid: Optional[UUID] = None + + name: Optional[str] = "" + + description: Optional[str] = "" + + tags: Optional[List[str]] = [] + + collectionIds: Optional[List[UUID]] = [] + reviewStatus: Optional[ReviewStatus] = None @@ -598,7 +623,7 @@ class CrawlOut(BaseMongoModel): state: str - stats: Optional[Dict[str, int]] + stats: Optional[CrawlStats] fileSize: int = 0 fileCount: int = 0 @@ -610,6 +635,7 @@ class CrawlOut(BaseMongoModel): collectionIds: Optional[List[UUID]] = [] crawlExecSeconds: int = 0 + qaCrawlExecSeconds: int = 0 # automated crawl fields config: Optional[RawCrawlConfig] @@ -657,6 +683,13 @@ class DeleteCrawlList(BaseModel): crawl_ids: List[str] +# ============================================================================ +class DeleteQARunList(BaseModel): + """delete qa run list POST body""" + + qa_run_ids: List[str] + + # ============================================================================ ### AUTOMATED CRAWLS ### @@ -669,6 +702,36 @@ class CrawlScale(BaseModel): scale: conint(ge=1, le=MAX_CRAWL_SCALE) = 1 # type: ignore +# ============================================================================ +class QARun(CoreCrawlable, BaseModel): + """Subdocument to track QA runs for given crawl""" + + +# ============================================================================ +class QARunWithResources(QARun): + """QA crawl output model including resources""" + + resources: Optional[List[CrawlFileOut]] = [] + + +# ============================================================================ +class QARunOut(BaseModel): + """QA Run Output""" + + id: str + + userName: Optional[str] + + started: datetime + finished: Optional[datetime] = None + + state: str + + crawlExecSeconds: int = 0 + + stats: CrawlStats = CrawlStats() + + # ============================================================================ class Crawl(BaseCrawl, CrawlConfigCore): """Store State of a Crawl (Finished or Running)""" @@ -686,9 +749,10 @@ class Crawl(BaseCrawl, CrawlConfigCore): stopping: Optional[bool] = False - crawlExecSeconds: int = 0 + qaCrawlExecSeconds: int = 0 - image: Optional[str] + qa: Optional[QARun] = None + qaFinished: Optional[Dict[str, QARun]] = {} # ============================================================================ @@ -718,8 +782,6 @@ class UploadedCrawl(BaseCrawl): type: Literal["upload"] = "upload" - tags: Optional[List[str]] = [] - # ============================================================================ class UpdateUpload(UpdateCrawl): @@ -910,8 +972,15 @@ class OrgOut(BaseMongoModel): storageQuotaReached: Optional[bool] execMinutesQuotaReached: Optional[bool] + # total usage and exec time usage: Optional[Dict[str, int]] crawlExecSeconds: Dict[str, int] = {} + + # qa only usage + exec time + qaUsage: Optional[Dict[str, int]] = {} + qaCrawlExecSeconds: Dict[str, int] = {} + + # exec time limits monthlyExecSeconds: Dict[str, int] = {} extraExecSeconds: Dict[str, int] = {} giftedExecSeconds: Dict[str, int] = {} @@ -945,8 +1014,15 @@ class Organization(BaseMongoModel): bytesStoredUploads: int = 0 bytesStoredProfiles: int = 0 + # total usage + exec time usage: Dict[str, int] = {} crawlExecSeconds: Dict[str, int] = {} + + # qa only usage + exec time + qaUsage: Dict[str, int] = {} + qaCrawlExecSeconds: Dict[str, int] = {} + + # exec time limits monthlyExecSeconds: Dict[str, int] = {} extraExecSeconds: Dict[str, int] = {} giftedExecSeconds: Dict[str, int] = {} @@ -1427,23 +1503,31 @@ class PageNote(BaseModel): userName: str +# ============================================================================ +class PageQACompare(BaseModel): + """Model for updating pages from QA run""" + + screenshotMatch: Optional[float] = None + textMatch: Optional[float] = None + resourceCounts: Optional[Dict[str, int]] + + # ============================================================================ class Page(BaseMongoModel): - """Model for crawl pages""" + """Core page data, no QA""" + + id: UUID oid: UUID crawl_id: str + + # core page data url: AnyHttpUrl title: Optional[str] = None - timestamp: Optional[datetime] = None - load_state: Optional[int] = None + ts: Optional[datetime] = None + loadState: Optional[int] = None status: Optional[int] = None - # automated heuristics, keyed by QA run id - screenshotMatch: Optional[Dict[str, float]] = {} - textMatch: Optional[Dict[str, float]] = {} - resourceCounts: Optional[Dict[str, Dict[str, int]]] = {} - # manual review userid: Optional[UUID] = None modified: Optional[datetime] = None @@ -1451,17 +1535,31 @@ class Page(BaseMongoModel): notes: List[PageNote] = [] +# ============================================================================ +class PageWithAllQA(Page): + """Model for core page data + qa""" + + # automated heuristics, keyed by QA run id + qa: Optional[Dict[str, PageQACompare]] = {} + + # ============================================================================ class PageOut(Page): - """Model for pages output""" + """Model for pages output, no QA""" status: Optional[int] = 200 # ============================================================================ -class PageQAUpdate(BaseModel): - """Model for updating pages from QA run""" +class PageOutWithSingleQA(Page): + """Page out with single QA entry""" - screenshotMatch: Optional[int] = None - textMatch: Optional[int] = None - resourceCounts: Optional[Dict[str, Dict[str, int]]] + qa: Optional[PageQACompare] = None + + +# ============================================================================ +class PagesAndResources(BaseModel): + """moage for qa configmap data, pages + resources""" + + resources: List[CrawlFileOut] = [] + pages: List[PageOut] = [] diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index e050b09ce4..c33afe1d59 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -3,11 +3,10 @@ import traceback import os from pprint import pprint -from typing import Optional +from typing import Optional, Any +from datetime import datetime import json -from uuid import UUID -from fastapi import HTTPException import humanize @@ -21,9 +20,11 @@ RUNNING_AND_STARTING_STATES, SUCCESSFUL_STATES, FAILED_STATES, + CrawlStats, CrawlFile, CrawlCompleteIn, StorageRef, + PagesAndResources, ) from btrixcloud.utils import ( @@ -113,21 +114,33 @@ async def sync_crawls(self, data: MCSyncData): pods = data.children[POD] + crawl = CrawlSpec( + id=crawl_id, + cid=cid, + oid=oid, + storage=StorageRef(spec["storageName"]), + crawler_channel=spec.get("crawlerChannel"), + scale=spec.get("scale", 1), + started=data.parent["metadata"]["creationTimestamp"], + stopping=spec.get("stopping", False), + timeout=spec.get("timeout") or 0, + max_crawl_size=int(spec.get("maxCrawlSize") or 0), + scheduled=spec.get("manual") != "1", + qa_source_crawl_id=spec.get("qaSourceCrawlId"), + ) + # if finalizing, crawl is being deleted if data.finalizing: if not status.finished: # if can't cancel, already finished - if not await self.cancel_crawl( - crawl_id, UUID(cid), UUID(oid), status, data.children[POD] - ): - # instead of fetching the state (that was already set) - # return exception to ignore this request, keep previous - # finished state - raise HTTPException(status_code=400, detail="out_of_sync_status") + await self.cancel_crawl(crawl, status, data.children[POD]) + # instead of fetching the state (that was already set) + # return exception to ignore this request, keep previous + # finished state + # raise HTTPException(status_code=400, detail="out_of_sync_status") return await self.finalize_response( - crawl_id, - UUID(oid), + crawl, status, spec, data.children, @@ -140,10 +153,9 @@ async def sync_crawls(self, data: MCSyncData): print( f"warn crawl {crawl_id} finished but not deleted, post-finish taking too long?" ) - self.run_task(self.k8s.delete_crawl_job(crawl_id)) + self.run_task(self.k8s.delete_crawl_job(crawl.id)) return await self.finalize_response( - crawl_id, - UUID(oid), + crawl, status, spec, data.children, @@ -155,24 +167,10 @@ async def sync_crawls(self, data: MCSyncData): # pylint: disable=bare-except, broad-except except: # fail crawl if config somehow missing, shouldn't generally happen - await self.fail_crawl(crawl_id, UUID(cid), UUID(oid), status, pods) + await self.fail_crawl(crawl, status, pods) return self._empty_response(status) - crawl = CrawlSpec( - id=crawl_id, - cid=cid, - oid=oid, - storage=StorageRef(spec["storageName"]), - crawler_channel=spec.get("crawlerChannel"), - scale=spec.get("scale", 1), - started=data.parent["metadata"]["creationTimestamp"], - stopping=spec.get("stopping", False), - timeout=spec.get("timeout") or 0, - max_crawl_size=int(spec.get("maxCrawlSize") or 0), - scheduled=spec.get("manual") != "1", - ) - # shouldn't get here, crawl should already be finalizing when canceled # just in case, handle canceled-but-not-finalizing here if status.state == "canceled": @@ -188,9 +186,7 @@ async def sync_crawls(self, data: MCSyncData): and not data.children[PVC] and await self.org_ops.storage_quota_reached(crawl.oid) ): - await self.mark_finished( - crawl.id, crawl.cid, crawl.oid, status, "skipped_quota_reached" - ) + await self.mark_finished(crawl, status, "skipped_quota_reached") return self._empty_response(status) if status.state in ("starting", "waiting_org_limit"): @@ -198,7 +194,7 @@ async def sync_crawls(self, data: MCSyncData): return self._empty_response(status) await self.set_state( - "starting", status, crawl.id, allowed_from=["waiting_org_limit"] + "starting", status, crawl, allowed_from=["waiting_org_limit"] ) if len(pods): @@ -218,8 +214,7 @@ async def sync_crawls(self, data: MCSyncData): if status.finished: return await self.finalize_response( - crawl_id, - UUID(oid), + crawl, status, spec, data.children, @@ -227,13 +222,15 @@ async def sync_crawls(self, data: MCSyncData): ) await self.increment_pod_exec_time( - pods, status, crawl.id, crawl.oid, EXEC_TIME_UPDATE_SECS + pods, crawl, status, EXEC_TIME_UPDATE_SECS ) else: status.scale = crawl.scale now = dt_now() - await self.crawl_ops.inc_crawl_exec_time(crawl_id, 0, now) + await self.crawl_ops.inc_crawl_exec_time( + crawl.db_crawl_id, crawl.is_qa, 0, now + ) status.lastUpdatedTime = to_k8s_date(now) children = self._load_redis(params, status, data.children) @@ -241,9 +238,13 @@ async def sync_crawls(self, data: MCSyncData): storage_path = crawl.storage.get_storage_extra_path(oid) storage_secret = crawl.storage.get_storage_secret_name(oid) + if not crawl.is_qa: + params["profile_filename"] = configmap["PROFILE_FILENAME"] + else: + storage_path += "qa/" + params["storage_path"] = storage_path params["storage_secret"] = storage_secret - params["profile_filename"] = configmap["PROFILE_FILENAME"] # only resolve if not already set # not automatically updating image for existing crawls @@ -269,6 +270,10 @@ async def sync_crawls(self, data: MCSyncData): else: params["force_restart"] = False + if crawl.qa_source_crawl_id: + params["qa_source_crawl_id"] = crawl.qa_source_crawl_id + children.extend(await self._load_qa_configmap(params, data.children)) + for i in range(0, status.scale): children.extend(self._load_crawler(params, i, status, data.children)) @@ -294,6 +299,25 @@ def _load_redis(self, params, status, children): return self.load_from_yaml("redis.yaml", params) + async def _load_qa_configmap(self, params, children): + qa_source_crawl_id = params["qa_source_crawl_id"] + name = f"qa-replay-{qa_source_crawl_id}" + + if name in children[CMAP]: + return [children[CMAP][name]] + + pages, _ = await self.page_ops.list_pages(qa_source_crawl_id, page_size=1000) + + crawl_replay = await self.crawl_ops.get_internal_crawl_out(qa_source_crawl_id) + + res_and_pages = PagesAndResources(resources=crawl_replay.resources, pages=pages) + + params["name"] = name + params["qa_source_replay_json"] = res_and_pages.json() + # params["qa_source_replay_json"] = crawl_replay.json(include={"resources"}) + + return self.load_from_yaml("qa_configmap.yaml", params) + def _load_crawler(self, params, i, status, children): name = f"crawl-{params['id']}-{i}" has_pod = name in children[POD] @@ -308,7 +332,10 @@ def _load_crawler(self, params, i, status, children): if params.get("do_restart"): print(f"Restart {name}") - params["priorityClassName"] = f"crawl-instance-{i}" + if params.get("qa_source_crawl_id"): + params["priorityClassName"] = f"qa-crawl-instance-{i}" + else: + params["priorityClassName"] = f"crawl-instance-{i}" return self.load_from_yaml("crawler.yaml", params) @@ -382,7 +409,15 @@ def sync_resources(self, status, name, pod, children): src = pvc["spec"]["resources"]["requests"] resources.storage = int(parse_quantity(src.get("storage"))) - async def set_state(self, state, status, crawl_id, allowed_from, **kwargs): + async def set_state( + self, + state: str, + status: CrawlStatus, + crawl: CrawlSpec, + allowed_from: list[str], + finished: Optional[datetime] = None, + stats: Optional[CrawlStats] = None, + ): """set status state and update db, if changed if allowed_from passed in, can only transition from allowed_from state, otherwise get current state from db and return @@ -409,15 +444,22 @@ async def set_state(self, state, status, crawl_id, allowed_from, **kwargs): """ if not allowed_from or status.state in allowed_from: res = await self.crawl_ops.update_crawl_state_if_allowed( - crawl_id, state=state, allowed_from=allowed_from, **kwargs + crawl.db_crawl_id, + crawl.is_qa, + state=state, + allowed_from=allowed_from, + finished=finished, + stats=stats, ) if res: - print(f"Setting state: {status.state} -> {state}, {crawl_id}") + print(f"Setting state: {status.state} -> {state}, {crawl.id}") status.state = state return True # get actual crawl state - actual_state, finished = await self.crawl_ops.get_crawl_state(crawl_id) + actual_state, finished = await self.crawl_ops.get_crawl_state( + crawl.db_crawl_id, crawl.is_qa + ) if actual_state: status.state = actual_state if finished: @@ -430,7 +472,7 @@ async def set_state(self, state, status, crawl_id, allowed_from, **kwargs): if status.state != state: print( - f"Not setting state: {status.state} -> {state}, {crawl_id} not allowed" + f"Not setting state: {status.state} -> {state}, {crawl.id} not allowed" ) return False @@ -497,23 +539,21 @@ async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status): i += 1 await self.set_state( - "waiting_org_limit", status, crawl.id, allowed_from=["starting"] + "waiting_org_limit", status, crawl, allowed_from=["starting"] ) return False async def cancel_crawl( self, - crawl_id: str, - cid: UUID, - oid: UUID, + crawl: CrawlSpec, status: CrawlStatus, pods: dict, ) -> bool: """Mark crawl as canceled""" - if not await self.mark_finished(crawl_id, cid, oid, status, "canceled"): + if not await self.mark_finished(crawl, status, "canceled"): return False - await self.mark_for_cancelation(crawl_id) + await self.mark_for_cancelation(crawl.id) if not status.canceled: for name, pod in pods.items(): @@ -538,19 +578,15 @@ async def cancel_crawl( async def fail_crawl( self, - crawl_id: str, - cid: UUID, - oid: UUID, + crawl: CrawlSpec, status: CrawlStatus, pods: dict, - stats=None, + stats: Optional[CrawlStats] = None, ) -> bool: """Mark crawl as failed, log crawl state and print crawl logs, if possible""" prev_state = status.state - if not await self.mark_finished( - crawl_id, cid, oid, status, "failed", stats=stats - ): + if not await self.mark_finished(crawl, status, "failed", stats=stats): return False if not self.log_failed_crawl_lines or prev_state == "failed": @@ -575,8 +611,7 @@ def _empty_response(self, status): async def finalize_response( self, - crawl_id: str, - oid: UUID, + crawl: CrawlSpec, status: CrawlStatus, spec: dict, children: dict, @@ -584,7 +619,7 @@ async def finalize_response( ): """ensure crawl id ready for deletion""" - redis_pod = f"redis-{crawl_id}" + redis_pod = f"redis-{crawl.id}" new_children = [] finalized = False @@ -595,7 +630,7 @@ async def finalize_response( # if has other pods, keep redis pod until they are removed if len(pods) > 1: new_children = self._load_redis(params, status, children) - await self.increment_pod_exec_time(pods, status, crawl_id, oid) + await self.increment_pod_exec_time(pods, crawl, status) # keep pvs until pods are removed if new_children: @@ -606,12 +641,15 @@ async def finalize_response( if status.finished: ttl = spec.get("ttlSecondsAfterFinished", DEFAULT_TTL) finished = from_k8s_date(status.finished) - if (dt_now() - finished).total_seconds() > ttl > 0: - print("CrawlJob expired, deleting: " + crawl_id) + if (dt_now() - finished).total_seconds() > ttl >= 0: + print("CrawlJob expired, deleting: " + crawl.id) finalized = True else: finalized = True + if finalized and crawl.is_qa: + await self.crawl_ops.qa_run_finished(crawl.db_crawl_id) + return { "status": status.dict(exclude_none=True), "children": new_children, @@ -664,7 +702,7 @@ async def sync_crawl_state( await self.set_state( "waiting_capacity", status, - crawl.id, + crawl, allowed_from=RUNNING_AND_STARTING_ONLY, ) @@ -696,7 +734,7 @@ async def sync_crawl_state( if await self.set_state( "running", status, - crawl.id, + crawl, allowed_from=["starting", "waiting_capacity"], ): self.run_task( @@ -717,14 +755,22 @@ async def sync_crawl_state( file_done = await redis.lpop(self.done_key) page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}") + qa_run_id = crawl.id if crawl.is_qa else None + while page_crawled: + print("PAGE DATA", flush=True) + print(page_crawled, flush=True) page_dict = json.loads(page_crawled) - await self.page_ops.add_page_to_db(page_dict, crawl.id, crawl.oid) + await self.page_ops.add_page_to_db( + page_dict, crawl.db_crawl_id, qa_run_id, crawl.oid + ) page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}") crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}") while crawl_error: - await self.crawl_ops.add_crawl_error(crawl.id, crawl_error) + await self.crawl_ops.add_crawl_error( + crawl.db_crawl_id, crawl.is_qa, crawl_error + ) crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}") # ensure filesAdded and filesAddedSize always set @@ -822,20 +868,21 @@ def handle_terminated_pod(self, name, role, status, terminated): async def increment_pod_exec_time( self, pods: dict[str, dict], + crawl: CrawlSpec, status: CrawlStatus, - crawl_id: str, - oid: UUID, min_duration=0, ) -> None: """inc exec time tracking""" now = dt_now() update_start_time = await self.crawl_ops.get_crawl_exec_last_update_time( - crawl_id + crawl.db_crawl_id ) if not update_start_time: - await self.crawl_ops.inc_crawl_exec_time(crawl_id, 0, now) + await self.crawl_ops.inc_crawl_exec_time( + crawl.db_crawl_id, crawl.is_qa, 0, now + ) status.lastUpdatedTime = to_k8s_date(now) return @@ -912,7 +959,9 @@ async def increment_pod_exec_time( max_duration = max(duration, max_duration) if exec_time: - await self.org_ops.inc_org_time_stats(oid, exec_time, True) + await self.org_ops.inc_org_time_stats( + crawl.oid, exec_time, True, crawl.is_qa + ) status.crawlExecTime += exec_time status.elapsedCrawlTime += max_duration @@ -921,7 +970,9 @@ async def increment_pod_exec_time( flush=True, ) - await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time, now) + await self.crawl_ops.inc_crawl_exec_time( + crawl.db_crawl_id, crawl.is_qa, exec_time, now + ) status.lastUpdatedTime = to_k8s_date(now) def should_mark_waiting(self, state, started): @@ -1022,7 +1073,13 @@ async def add_file_to_crawl(self, cc_data, crawl, redis): await redis.incr("filesAddedSize", filecomplete.size) - await self.crawl_ops.add_crawl_file(crawl.id, crawl_file, filecomplete.size) + await self.crawl_ops.add_crawl_file( + crawl.db_crawl_id, crawl.is_qa, crawl_file, filecomplete.size + ) + + # no replicas for QA for now + if crawl.is_qa: + return True try: await self.background_job_ops.create_replica_jobs( @@ -1066,7 +1123,9 @@ async def is_crawl_stopping( return None - async def get_redis_crawl_stats(self, redis: Redis, crawl_id: str): + async def get_redis_crawl_stats( + self, redis: Redis, crawl_id: str + ) -> tuple[CrawlStats, dict[str, Any]]: """get page stats""" try: # crawler >0.9.0, done key is a value @@ -1079,7 +1138,7 @@ async def get_redis_crawl_stats(self, redis: Redis, crawl_id: str): sizes = await redis.hgetall(f"{crawl_id}:size") archive_size = sum(int(x) for x in sizes.values()) - stats = {"found": pages_found, "done": pages_done, "size": archive_size} + stats = CrawlStats(found=pages_found, done=pages_done, size=archive_size) return stats, sizes async def update_crawl_state( @@ -1095,15 +1154,17 @@ async def update_crawl_state( stats, sizes = await self.get_redis_crawl_stats(redis, crawl.id) # need to add size of previously completed WACZ files as well! - stats["size"] += status.filesAddedSize + stats.size += status.filesAddedSize # update status - status.pagesDone = stats["done"] - status.pagesFound = stats["found"] - status.size = stats["size"] + status.pagesDone = stats.done + status.pagesFound = stats.found + status.size = stats.size status.sizeHuman = humanize.naturalsize(status.size) - await self.crawl_ops.update_running_crawl_stats(crawl.id, stats) + await self.crawl_ops.update_running_crawl_stats( + crawl.db_crawl_id, crawl.is_qa, stats + ) for key, value in sizes.items(): value = int(value) @@ -1139,9 +1200,7 @@ async def update_crawl_state( # check if one-page crawls actually succeeded # if only one page found, and no files, assume failed if status.pagesFound == 1 and not status.filesAdded: - await self.fail_crawl( - crawl.id, crawl.cid, crawl.oid, status, pods, stats - ) + await self.fail_crawl(crawl, status, pods, stats) return status if status.stopReason in ("stopped_by_user", "stopped_quota_reached"): @@ -1149,21 +1208,15 @@ async def update_crawl_state( else: state = "complete" - await self.mark_finished( - crawl.id, crawl.cid, crawl.oid, status, state, crawl, stats - ) + await self.mark_finished(crawl, status, state, stats) # check if all crawlers failed elif status_count.get("failed", 0) >= crawl.scale: # if stopping, and no pages finished, mark as canceled if status.stopping and not status.pagesDone: - await self.mark_finished( - crawl.id, crawl.cid, crawl.oid, status, "canceled", crawl, stats - ) + await self.mark_finished(crawl, status, "canceled", stats) else: - await self.fail_crawl( - crawl.id, crawl.cid, crawl.oid, status, pods, stats - ) + await self.fail_crawl(crawl, status, pods, stats) # check for other statuses else: @@ -1180,7 +1233,7 @@ async def update_crawl_state( new_status = "pending-wait" if new_status: await self.set_state( - new_status, status, crawl.id, allowed_from=RUNNING_STATES + new_status, status, crawl, allowed_from=RUNNING_STATES ) return status @@ -1188,22 +1241,15 @@ async def update_crawl_state( # pylint: disable=too-many-arguments async def mark_finished( self, - crawl_id: str, - cid: UUID, - oid: UUID, + crawl: CrawlSpec, status: CrawlStatus, state: str, - crawl=None, - stats=None, + stats: Optional[CrawlStats] = None, ) -> bool: """mark crawl as finished, set finished timestamp and final state""" finished = dt_now() - kwargs = {"finished": finished} - if stats: - kwargs["stats"] = stats - if state in SUCCESSFUL_STATES: allowed_from = RUNNING_STATES else: @@ -1211,7 +1257,12 @@ async def mark_finished( # if set_state returns false, already set to same status, return if not await self.set_state( - state, status, crawl_id, allowed_from=allowed_from, **kwargs + state, + status, + crawl, + allowed_from=allowed_from, + finished=finished, + stats=stats, ): print("already finished, ignoring mark_finished") if not status.finished: @@ -1221,45 +1272,63 @@ async def mark_finished( status.finished = to_k8s_date(finished) - if crawl and state in SUCCESSFUL_STATES: + if state in SUCCESSFUL_STATES: await self.inc_crawl_complete_stats(crawl, finished) - self.run_task( - self.do_crawl_finished_tasks( - crawl_id, cid, oid, status.filesAddedSize, state - ) - ) + # Regular Crawl Finished + if not crawl.is_qa: + self.run_task(self.do_crawl_finished_tasks(crawl, status, state)) + + # QA Run Finished + else: + self.run_task(self.do_qa_run_finished_tasks(crawl, state)) return True # pylint: disable=too-many-arguments async def do_crawl_finished_tasks( self, - crawl_id: str, - cid: UUID, - oid: UUID, - files_added_size: int, + crawl: CrawlSpec, + status: CrawlStatus, state: str, ) -> None: """Run tasks after crawl completes in asyncio.task coroutine.""" - await self.crawl_config_ops.stats_recompute_last(cid, files_added_size, 1) + await self.crawl_config_ops.stats_recompute_last( + crawl.cid, status.filesAddedSize, 1 + ) - if state in SUCCESSFUL_STATES and oid: - await self.org_ops.inc_org_bytes_stored(oid, files_added_size, "crawl") - await self.coll_ops.add_successful_crawl_to_collections(crawl_id, cid) + if state in SUCCESSFUL_STATES and crawl.oid: + await self.org_ops.inc_org_bytes_stored( + crawl.oid, status.filesAddedSize, "crawl" + ) + await self.coll_ops.add_successful_crawl_to_collections(crawl.id, crawl.cid) if state in FAILED_STATES: - await self.crawl_ops.delete_crawl_files(crawl_id, oid) - await self.page_ops.delete_crawl_pages(crawl_id, oid) + await self.crawl_ops.delete_crawl_files(crawl.id, crawl.oid) + await self.page_ops.delete_crawl_pages(crawl.id, crawl.oid) await self.event_webhook_ops.create_crawl_finished_notification( - crawl_id, oid, state + crawl.id, crawl.oid, state ) # finally, delete job - await self.k8s.delete_crawl_job(crawl_id) + await self.k8s.delete_crawl_job(crawl.id) + + # pylint: disable=too-many-arguments + async def do_qa_run_finished_tasks( + self, + crawl: CrawlSpec, + state: str, + ) -> None: + """Run tasks after qa run completes in asyncio.task coroutine.""" + + if state in FAILED_STATES: + await self.page_ops.delete_qa_run_from_pages(crawl.db_crawl_id, crawl.id) + + # finally, delete job + await self.k8s.delete_crawl_job(crawl.id) - async def inc_crawl_complete_stats(self, crawl, finished): + async def inc_crawl_complete_stats(self, crawl: CrawlSpec, finished: datetime): """Increment Crawl Stats""" started = from_k8s_date(crawl.started) @@ -1268,7 +1337,7 @@ async def inc_crawl_complete_stats(self, crawl, finished): print(f"Duration: {duration}", flush=True) - await self.org_ops.inc_org_time_stats(crawl.oid, duration) + await self.org_ops.inc_org_time_stats(crawl.oid, duration, False, crawl.is_qa) async def mark_for_cancelation(self, crawl_id): """mark crawl as canceled in redis""" diff --git a/backend/btrixcloud/operator/cronjobs.py b/backend/btrixcloud/operator/cronjobs.py index 720fe9dd71..445e86fbca 100644 --- a/backend/btrixcloud/operator/cronjobs.py +++ b/backend/btrixcloud/operator/cronjobs.py @@ -48,7 +48,9 @@ async def sync_cronjob_crawl(self, data: MCDecoratorSyncData): name = metadata.get("name") crawl_id = name - actual_state, finished = await self.crawl_ops.get_crawl_state(crawl_id) + actual_state, finished = await self.crawl_ops.get_crawl_state( + crawl_id, is_qa=False + ) if finished: status = None # mark job as completed diff --git a/backend/btrixcloud/operator/models.py b/backend/btrixcloud/operator/models.py index 2102a11779..f5a2f41473 100644 --- a/backend/btrixcloud/operator/models.py +++ b/backend/btrixcloud/operator/models.py @@ -60,6 +60,17 @@ class CrawlSpec(BaseModel): scheduled: bool = False timeout: int = 0 max_crawl_size: int = 0 + qa_source_crawl_id: Optional[str] = "" + + @property + def db_crawl_id(self) -> str: + """return actual crawl_id for db, if qa run""" + return self.qa_source_crawl_id or self.id + + @property + def is_qa(self) -> bool: + """return true if qa run""" + return bool(self.qa_source_crawl_id) # ============================================================================ diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index b5534aa5d6..804d60aa92 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -20,6 +20,7 @@ SUCCESSFUL_STATES, RUNNING_STATES, STARTING_STATES, + BaseCrawl, Organization, StorageRef, OrgQuotas, @@ -498,18 +499,22 @@ async def set_origin(self, org: Organization, request: Request): {"_id": org.id}, {"$set": {"origin": origin}} ) - async def inc_org_time_stats(self, oid, duration, is_exec_time=False): + async def inc_org_time_stats(self, oid, duration, is_exec_time=False, is_qa=False): """inc crawl duration stats for org Overage is applied only to crawlExecSeconds - monthlyExecSeconds, giftedExecSeconds, and extraExecSeconds are added to only up to quotas + + If is_qa is true, also update seperate qa only counter """ - # pylint: disable=too-many-return-statements + # pylint: disable=too-many-return-statements, too-many-locals key = "crawlExecSeconds" if is_exec_time else "usage" yymm = datetime.utcnow().strftime("%Y-%m") - await self.orgs.find_one_and_update( - {"_id": oid}, {"$inc": {f"{key}.{yymm}": duration}} - ) + inc_query = {f"{key}.{yymm}": duration} + if is_qa: + qa_key = "qaCrawlExecSeconds" if is_exec_time else "qaUsage" + inc_query[f"{qa_key}.{yymm}"] = duration + await self.orgs.find_one_and_update({"_id": oid}, {"$inc": inc_query}) if not is_exec_time: return @@ -608,17 +613,17 @@ async def get_org_metrics(self, org: Organization): upload_count = 0 page_count = 0 - async for item in self.crawls_db.find({"oid": org.id}): - if item["state"] not in SUCCESSFUL_STATES: + async for item_data in self.crawls_db.find({"oid": org.id}): + item = BaseCrawl.from_dict(item_data) + if item.state not in SUCCESSFUL_STATES: continue archived_item_count += 1 - type_ = item.get("type") - if type_ == "crawl": + if item.type == "crawl": crawl_count += 1 - if type_ == "upload": + if item.type == "upload": upload_count += 1 - if item.get("stats"): - page_count += item.get("stats", {}).get("done", 0) + if item.stats: + page_count += item.stats.done profile_count = await self.profiles_db.count_documents({"oid": org.id}) workflows_running_count = await self.crawls_db.count_documents( diff --git a/backend/btrixcloud/pages.py b/backend/btrixcloud/pages.py index b3af72b3d4..231ff9cb58 100644 --- a/backend/btrixcloud/pages.py +++ b/backend/btrixcloud/pages.py @@ -12,8 +12,9 @@ from .models import ( Page, PageOut, + PageOutWithSingleQA, PageReviewUpdate, - PageQAUpdate, + PageQACompare, Organization, PaginatedResponse, User, @@ -49,12 +50,18 @@ def __init__(self, mdb, crawl_ops, org_ops, storage_ops): self.org_ops = org_ops self.storage_ops = storage_ops + async def init_index(self): + """init index for pages db collection""" + await self.pages.create_index([("crawl_id", pymongo.HASHED)]) + async def add_crawl_pages_to_db_from_wacz(self, crawl_id: str, batch_size=100): """Add pages to database from WACZ files""" pages_buffer: List[Page] = [] try: - crawl = await self.crawl_ops.get_crawl(crawl_id, None) - stream = await self.storage_ops.sync_stream_wacz_pages(crawl.resources) + crawl = await self.crawl_ops.get_crawl_out(crawl_id) + stream = await self.storage_ops.sync_stream_wacz_pages( + crawl.resources or [] + ) for page_dict in stream: if not page_dict.get("url"): continue @@ -81,7 +88,6 @@ def _get_page_from_dict(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUI page_id = page_dict.get("id") if not page_id: print(f'Page {page_dict.get("url")} has no id - assigning UUID', flush=True) - page_id = uuid4() status = page_dict.get("status") if not status and page_dict.get("loadState"): @@ -93,9 +99,9 @@ def _get_page_from_dict(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUI crawl_id=crawl_id, url=page_dict.get("url"), title=page_dict.get("title"), - load_state=page_dict.get("loadState"), + loadState=page_dict.get("loadState"), status=status, - timestamp=( + ts=( from_k8s_date(page_dict.get("ts")) if page_dict.get("ts") else datetime.now() @@ -116,7 +122,13 @@ async def _add_pages_to_db(self, pages: List[Page]): # pylint: disable=broad-exception-raised raise Exception("No pages inserted") - async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID): + async def add_page_to_db( + self, + page_dict: Dict[str, Any], + crawl_id: str, + qa_run_id: Optional[str], + oid: UUID, + ): """Add page to database""" page = self._get_page_from_dict(page_dict, crawl_id, oid) @@ -127,13 +139,27 @@ async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UU ) ) except pymongo.errors.DuplicateKeyError: - return + pass + # pylint: disable=broad-except except Exception as err: print( f"Error adding page {page.id} from crawl {crawl_id} to db: {err}", flush=True, ) + return + + # qa data + if qa_run_id and page: + compare_dict = page_dict.get("comparison") + if compare_dict is None: + print("QA Run, but compare data missing!") + return + + compare = PageQACompare(**compare_dict) + print("Adding QA Run Data for Page", page_dict.get("url"), compare) + + await self.add_qa_run_for_page(page.id, oid, qa_run_id, compare) async def delete_crawl_pages(self, crawl_id: str, oid: Optional[UUID] = None): """Delete crawl pages from db""" @@ -175,38 +201,30 @@ async def get_page( page_raw = await self.get_page_raw(page_id, oid, crawl_id) return Page.from_dict(page_raw) - async def update_page_qa( - self, - page_id: UUID, - oid: UUID, - qa_run_id: str, - update: PageQAUpdate, - ) -> Dict[str, bool]: + async def add_qa_run_for_page( + self, page_id: UUID, oid: UUID, qa_run_id: str, compare: PageQACompare + ) -> bool: """Update page heuristics and mime/type from QA run""" - query = update.dict(exclude_unset=True) - - if len(query) == 0: - raise HTTPException(status_code=400, detail="no_update_data") - keyed_fields = ("screenshotMatch", "textMatch", "resourceCounts") - for field in keyed_fields: - score = query.get(field) - if score: - query[f"{field}.{qa_run_id}"] = score - query.pop(field, None) - - query["modified"] = datetime.utcnow().replace(microsecond=0, tzinfo=None) + # modified = datetime.utcnow().replace(microsecond=0, tzinfo=None) result = await self.pages.find_one_and_update( {"_id": page_id, "oid": oid}, - {"$set": query}, + {"$set": {f"qa.{qa_run_id}": compare.dict()}}, return_document=pymongo.ReturnDocument.AFTER, ) if not result: raise HTTPException(status_code=404, detail="page_not_found") - return {"updated": True} + return True + + async def delete_qa_run_from_pages(self, crawl_id: str, qa_run_id: str): + """delete pages""" + result = await self.pages.update_many( + {"crawl_id": crawl_id}, {"$unset": {f"qa.{qa_run_id}": ""}} + ) + return result async def update_page_approval( self, @@ -334,23 +352,50 @@ async def delete_page_notes( async def list_pages( self, - org: Organization, crawl_id: str, + org: Optional[Organization] = None, + qa_run_id: Optional[str] = None, + qa_filter_by: Optional[str] = None, + qa_gte: Optional[float] = None, + qa_gt: Optional[float] = None, + qa_lte: Optional[float] = None, + qa_lt: Optional[float] = None, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1, sort_by: Optional[str] = None, sort_direction: Optional[int] = -1, - ) -> Tuple[List[Page], int]: + ) -> Tuple[Union[List[PageOut], List[PageOutWithSingleQA]], int]: """List all pages in crawl""" - # pylint: disable=duplicate-code, too-many-locals + # pylint: disable=duplicate-code, too-many-locals, too-many-branches # Zero-index page for query page = page - 1 skip = page_size * page query: dict[str, object] = { - "oid": org.id, "crawl_id": crawl_id, } + if org: + query["oid"] = org.id + + if qa_run_id: + query[f"qa.{qa_run_id}"] = {"$exists": True} + + range_filter = {} + + if qa_gte: + range_filter["$gte"] = qa_gte + if qa_lte: + range_filter["$lte"] = qa_lte + if qa_gt: + range_filter["$gt"] = qa_gt + if qa_lt: + range_filter["$lt"] = qa_lt + + if qa_filter_by: + if not range_filter: + raise HTTPException(status_code=400, detail="range_missing") + + query[f"qa.{qa_run_id}.{qa_filter_by}"] = range_filter aggregate = [{"$match": query}] @@ -358,13 +403,27 @@ async def list_pages( # Sorting options to add: # - automated heuristics like screenshot_comparison (dict keyed by QA run id) # - Ensure notes sorting works okay with notes in list - sort_fields = ("url", "title", "notes", "approved", "notes") - if sort_by not in sort_fields: + sort_fields = ("url", "title", "notes", "approved") + qa_sort_fields = ("screenshotMatch", "textMatch") + if sort_by not in sort_fields and sort_by not in qa_sort_fields: raise HTTPException(status_code=400, detail="invalid_sort_by") if sort_direction not in (1, -1): raise HTTPException(status_code=400, detail="invalid_sort_direction") + + if sort_by in qa_sort_fields: + if not qa_run_id: + raise HTTPException( + status_code=400, detail="qa_run_id_missing_for_qa_sort" + ) + + sort_by = f"qa.{qa_run_id}.{sort_by}" + aggregate.extend([{"$sort": {sort_by: sort_direction}}]) + if qa_run_id: + aggregate.extend([{"$set": {"qa": f"$qa.{qa_run_id}"}}]) + # aggregate.extend([{"$project": {"qa": f"$qa.{qa_run_id}"}}]) + aggregate.extend( [ { @@ -390,9 +449,10 @@ async def list_pages( except (IndexError, ValueError): total = 0 - pages = [PageOut.from_dict(data) for data in items] + if qa_run_id: + return [PageOutWithSingleQA.from_dict(data) for data in items], total - return pages, total + return [PageOut.from_dict(data) for data in items], total async def re_add_crawl_pages(self, crawl_id: str, oid: UUID): """Delete existing pages for crawl and re-add from WACZs.""" @@ -523,8 +583,44 @@ async def get_pages_list( ): """Retrieve paginated list of pages""" pages, total = await ops.list_pages( - org, crawl_id=crawl_id, + org=org, + page_size=pageSize, + page=page, + sort_by=sortBy, + sort_direction=sortDirection, + ) + return paginated_format(pages, total, page, pageSize) + + @app.get( + "/orgs/{oid}/crawls/{crawl_id}/qa/{qa_run_id}/pages", + tags=["pages", "qa"], + response_model=PaginatedResponse, + ) + async def get_pages_list_with_qa( + crawl_id: str, + qa_run_id: str, + filterQABy: Optional[str] = None, + gte: Optional[float] = None, + gt: Optional[float] = None, + lte: Optional[float] = None, + lt: Optional[float] = None, + org: Organization = Depends(org_crawl_dep), + pageSize: int = DEFAULT_PAGE_SIZE, + page: int = 1, + sortBy: Optional[str] = None, + sortDirection: Optional[int] = -1, + ): + """Retrieve paginated list of pages""" + pages, total = await ops.list_pages( + crawl_id=crawl_id, + org=org, + qa_run_id=qa_run_id, + qa_filter_by=filterQABy, + qa_gte=gte, + qa_gt=gt, + qa_lte=lte, + qa_lt=lt, page_size=pageSize, page=page, sort_by=sortBy, diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index a28878fc4b..835aadffee 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -72,17 +72,20 @@ class StorageOps: org_ops: OrgOps crawl_manager: CrawlManager + is_local_minio: bool + frontend_origin: str + def __init__(self, org_ops, crawl_manager) -> None: self.org_ops = org_ops self.crawl_manager = crawl_manager self.is_local_minio = is_bool(os.environ.get("IS_LOCAL_MINIO")) - self.frontend_alias = os.environ.get( - "FRONTEND_ALIAS", "http://browsertrix-cloud-frontend" + frontend_origin = os.environ.get( + "FRONTEND_ORIGIN", "http://browsertrix-cloud-frontend" ) - self.default_namespace = os.environ.get("DEFAULT_NAMESPACE", "default") - self.frontend_url = f"{self.frontend_alias}.{self.default_namespace}" + default_namespace = os.environ.get("DEFAULT_NAMESPACE", "default") + self.frontend_origin = f"{frontend_origin}.{default_namespace}" with open(os.environ["STORAGES_JSON"], encoding="utf-8") as fh: storage_list = json.loads(fh.read()) @@ -320,6 +323,12 @@ async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None resp = await client.put_object(Bucket=bucket, Key=key, Body=data) assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + def resolve_internal_access_path(self, path): + """Resolve relative path for internal access to minio bucket""" + if path.startswith("/"): + return self.frontend_origin + path + return path + def get_org_relative_path( self, org: Organization, ref: StorageRef, file_path: str ) -> str: @@ -601,10 +610,7 @@ def organize_based_on_instance_number( wacz_log_streams: List[Iterator[dict]] = [] for wacz_file in instance_list: - wacz_url = wacz_file.path - if wacz_url.startswith("/data"): - wacz_url = f"{self.frontend_url}{wacz_url}" - + wacz_url = self.resolve_internal_access_path(wacz_file.path) with RemoteZip(wacz_url) as remote_zip: log_files: List[ZipInfo] = [ f @@ -649,10 +655,7 @@ def stream_page_lines( page_generators: List[Iterator[Dict[Any, Any]]] = [] for wacz_file in wacz_files: - wacz_url = wacz_file.path - if wacz_url.startswith("/data"): - wacz_url = f"{self.frontend_url}{wacz_url}" - + wacz_url = self.resolve_internal_access_path(wacz_file.path) with RemoteZip(wacz_url) as remote_zip: page_files: List[ZipInfo] = [ f diff --git a/backend/btrixcloud/uploads.py b/backend/btrixcloud/uploads.py index 6f8f6474a0..2b3f6e2023 100644 --- a/backend/btrixcloud/uploads.py +++ b/backend/btrixcloud/uploads.py @@ -39,6 +39,15 @@ class UploadOps(BaseCrawlOps): """upload ops""" + async def get_upload( + self, + crawlid: str, + org: Optional[Organization] = None, + ) -> UploadedCrawl: + """Get crawl data for internal use""" + res = await self.get_crawl_raw(crawlid, org, "upload") + return UploadedCrawl.from_dict(res) + # pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods, too-many-function-args # pylint: disable=too-many-arguments, too-many-locals, duplicate-code, invalid-name async def upload_stream( @@ -60,7 +69,7 @@ async def upload_stream( prev_upload = None if replaceId: try: - prev_upload = await self.get_crawl_raw(replaceId, org, "upload") + prev_upload = await self.get_upload(replaceId, org) except HTTPException: # not found replaceId = None @@ -371,7 +380,7 @@ async def list_uploads( response_model=CrawlOut, ) async def get_upload(crawlid: str, org: Organization = Depends(org_crawl_dep)): - return await ops.get_crawl(crawlid, org, "upload") + return await ops.get_crawl_out(crawlid, org, "upload") @app.get( "/orgs/all/uploads/{crawl_id}/replay.json", @@ -382,7 +391,7 @@ async def get_upload_replay_admin(crawl_id, user: User = Depends(user_dep)): if not user.is_superuser: raise HTTPException(status_code=403, detail="Not Allowed") - return await ops.get_crawl(crawl_id, None, "upload") + return await ops.get_crawl_out(crawl_id, None, "upload") @app.get( "/orgs/{oid}/uploads/{crawl_id}/replay.json", @@ -390,7 +399,7 @@ async def get_upload_replay_admin(crawl_id, user: User = Depends(user_dep)): response_model=CrawlOutWithResources, ) async def get_upload_replay(crawl_id, org: Organization = Depends(org_viewer_dep)): - return await ops.get_crawl(crawl_id, org, "upload") + return await ops.get_crawl_out(crawl_id, org, "upload") @app.patch("/orgs/{oid}/uploads/{crawl_id}", tags=["uploads"]) async def update_uploads_api( diff --git a/backend/btrixcloud/webhooks.py b/backend/btrixcloud/webhooks.py index aafe10e80c..5b3ec99243 100644 --- a/backend/btrixcloud/webhooks.py +++ b/backend/btrixcloud/webhooks.py @@ -195,12 +195,12 @@ async def _create_item_finished_notification( body: Union[CrawlFinishedBody, UploadFinishedBody], ): """Create webhook notification for finished crawl/upload.""" - crawl = await self.crawl_ops.get_crawl(crawl_id, org) + crawl = await self.crawl_ops.get_crawl_out(crawl_id, org) if not crawl: print(f"Crawl {crawl_id} not found, skipping event webhook", flush=True) return - body.resources = crawl.resources + body.resources = crawl.resources or [] notification = WebhookNotification( id=uuid4(), diff --git a/backend/test/test_qa.py b/backend/test/test_qa.py new file mode 100644 index 0000000000..6029541843 --- /dev/null +++ b/backend/test/test_qa.py @@ -0,0 +1,187 @@ +from .conftest import API_PREFIX, HOST_PREFIX +import requests +import time +from datetime import datetime + +qa_run_id = None + + +def test_run_qa(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/start", + headers=crawler_auth_headers, + ) + + assert r.status_code == 200 + + data = r.json() + assert data["started"] + global qa_run_id + qa_run_id = data["started"] + + +def test_run_qa_already_running(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/start", + headers=crawler_auth_headers, + ) + + assert r.status_code == 400 + assert r.json()["detail"] == "qa_already_running" + + +def test_active_qa(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/activeQA", + headers=crawler_auth_headers, + ) + + data = r.json() + qa = data["qa"] + + assert qa + assert qa["state"] + assert qa["started"] + assert not qa["finished"] + + +def test_qa_list(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa", + headers=crawler_auth_headers, + ) + + data = r.json() + + assert len(data) == 1 + + qa = data[0] + assert qa + assert qa["state"] + assert qa["started"] + assert not qa["finished"] + + +def test_wait_for_complete(crawler_crawl_id, crawler_auth_headers, default_org_id): + count = 0 + completed = False + while count < 24: + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/activeQA", + headers=crawler_auth_headers, + ) + + data = r.json() + if not data["qa"]: + completed = True + break + + time.sleep(5) + count += 1 + + assert completed + + +def test_qa_completed(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa", + headers=crawler_auth_headers, + ) + + data = r.json() + + assert len(data) == 1 + + qa = data[0] + assert qa + assert qa["state"] == "complete" + assert qa["started"] + assert qa["finished"] + assert qa["stats"]["found"] == 1 + assert qa["stats"]["done"] == 1 + assert qa["crawlExecSeconds"] > 0 + + +def test_qa_org_stats(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}", + headers=crawler_auth_headers, + ) + crawl_stats = r.json() + assert crawl_stats["qaCrawlExecSeconds"] > 0 + + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}", + headers=crawler_auth_headers, + ) + org_stats = r.json() + + yymm = datetime.utcnow().strftime("%Y-%m") + assert org_stats["qaCrawlExecSeconds"][yymm] > 0 + assert org_stats["qaUsage"][yymm] > 0 + + +def test_qa_page_data(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/{qa_run_id}/pages", + headers=crawler_auth_headers, + ) + data = r.json() + assert len(data["items"]) == 1 + page = data["items"][0] + assert page["title"] == "Webrecorder" + assert page["url"] == "https://webrecorder.net/" + assert page["qa"]["textMatch"] == 1.0 + assert page["qa"]["screenshotMatch"] == 1.0 + assert page["qa"]["resourceCounts"] == { + "crawlGood": 15, + "crawlBad": 0, + "replayGood": 15, + "replayBad": 1, + } + + +def test_qa_replay(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/{qa_run_id}/replay.json", + headers=crawler_auth_headers, + ) + data = r.json() + assert len(data["resources"]) == 1 + assert data["resources"][0]["path"] + + +def test_run_qa_not_running(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/stop", + headers=crawler_auth_headers, + ) + + assert r.status_code == 400 + assert r.json()["detail"] == "qa_not_running" + + +def test_delete_qa_run(crawler_crawl_id, crawler_auth_headers, default_org_id): + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/delete", + json={"qa_run_ids": [qa_run_id]}, + headers=crawler_auth_headers, + ) + + assert r.status_code == 200 + assert r.json()["deleted"] == True + + # deleted from finished qa list + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa", + headers=crawler_auth_headers, + ) + + assert len(r.json()) == 0 + + # deleted from pages + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawler_crawl_id}/qa/{qa_run_id}/pages", + headers=crawler_auth_headers, + ) + assert len(r.json()["items"]) == 0 diff --git a/backend/test/test_run_crawl.py b/backend/test/test_run_crawl.py index 26e58638a9..59720c40dc 100644 --- a/backend/test/test_run_crawl.py +++ b/backend/test/test_run_crawl.py @@ -431,9 +431,9 @@ def test_crawl_pages(crawler_auth_headers, default_org_id, crawler_crawl_id): assert page["oid"] assert page["crawl_id"] assert page["url"] - assert page["timestamp"] + assert page["ts"] assert page.get("title") or page.get("title") is None - assert page["load_state"] + assert page["loadState"] assert page["status"] # Test GET page endpoint @@ -450,13 +450,9 @@ def test_crawl_pages(crawler_auth_headers, default_org_id, crawler_crawl_id): assert page["oid"] assert page["crawl_id"] assert page["url"] - assert page["timestamp"] + assert page["ts"] assert page.get("title") or page.get("title") is None - assert page["load_state"] - - assert page["screenshotMatch"] == {} - assert page["textMatch"] == {} - assert page["resourceCounts"] == {} + assert page["loadState"] assert page["notes"] == [] assert page.get("userid") is None @@ -485,9 +481,9 @@ def test_crawl_pages(crawler_auth_headers, default_org_id, crawler_crawl_id): assert page["oid"] assert page["crawl_id"] assert page["url"] - assert page["timestamp"] + assert page["ts"] assert page.get("title") or page.get("title") is None - assert page["load_state"] + assert page["loadState"] assert page["notes"] == [] assert page["userid"] @@ -522,9 +518,9 @@ def test_re_add_crawl_pages(crawler_auth_headers, default_org_id, crawler_crawl_ assert page["oid"] assert page["crawl_id"] assert page["url"] - assert page["timestamp"] + assert page["ts"] assert page.get("title") or page.get("title") is None - assert page["load_state"] + assert page["loadState"] assert page["status"] # Ensure only superuser can re-add pages for all crawls in an org diff --git a/chart/app-templates/crawl_job.yaml b/chart/app-templates/crawl_job.yaml index 96fec0fc18..3255e56f99 100644 --- a/chart/app-templates/crawl_job.yaml +++ b/chart/app-templates/crawl_job.yaml @@ -4,7 +4,7 @@ metadata: name: crawljob-{{ id }} labels: crawl: "{{ id }}" - role: "job" + role: {{ "qa-job" if qa_source else "job" }} btrix.org: "{{ oid }}" btrix.user: "{{ userid }}" btrix.storage: "{{ storage_name }}" @@ -19,11 +19,14 @@ spec: cid: "{{ cid }}" oid: "{{ oid }}" scale: {{ scale }} - maxCrawlSize: {{ max_crawl_size }} - timeout: {{ timeout }} + + maxCrawlSize: {{ max_crawl_size if not qa_source else 0 }} + timeout: {{ timeout if not qa_source else 0 }} + qaSourceCrawlId: "{{ qa_source }}" + manual: {{ manual }} crawlerChannel: "{{ crawler_channel }}" - ttlSecondsAfterFinished: 30 + ttlSecondsAfterFinished: {{ 30 if not qa_source else 0 }} warcPrefix: "{{ warc_prefix }}" storageName: "{{ storage_name }}" diff --git a/chart/app-templates/crawler.yaml b/chart/app-templates/crawler.yaml index b3e7a0c92c..e9ea1834d0 100644 --- a/chart/app-templates/crawler.yaml +++ b/chart/app-templates/crawler.yaml @@ -53,8 +53,11 @@ spec: volumes: - name: crawl-config configMap: + {% if not qa_source_crawl_id %} name: crawl-config-{{ cid }} - + {% else %} + name: qa-replay-{{ qa_source_crawl_id }} + {% endif %} - name: crawl-data persistentVolumeClaim: claimName: {{ name }} @@ -102,6 +105,7 @@ spec: image: {{ crawler_image }} imagePullPolicy: {{ crawler_image_pull_policy }} command: + {% if not qa_source_crawl_id %} - crawl - --config - /tmp/crawl-config.json @@ -112,6 +116,14 @@ spec: - "@{{ profile_filename }}" {%- endif %} + {% else %} + - qa + - --qaSource + - /tmp/crawl-config.json + - --redisStoreUrl + - {{ redis_url }} + - --writePagesToRedis + {% endif %} volumeMounts: - name: crawl-config mountPath: /tmp/crawl-config.json diff --git a/chart/app-templates/qa_configmap.yaml b/chart/app-templates/qa_configmap.yaml new file mode 100644 index 0000000000..9fd9e4051b --- /dev/null +++ b/chart/app-templates/qa_configmap.yaml @@ -0,0 +1,14 @@ +# ------- +# CONFIGMAP +# ------- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ name }} + namespace: {{ namespace }} + labels: + crawl: {{ id }} + role: crawler + +data: + crawl-config.json: {{ qa_source_replay_json | tojson }} diff --git a/chart/app-templates/replica_job.yaml b/chart/app-templates/replica_job.yaml index 30870d3fb9..88a7da17b8 100644 --- a/chart/app-templates/replica_job.yaml +++ b/chart/app-templates/replica_job.yaml @@ -13,7 +13,7 @@ spec: template: spec: restartPolicy: Never - priorityClassName: bg-jobs + priorityClassName: bg-job podFailurePolicy: rules: - action: FailJob diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index 6efb48a86b..c19255d9c1 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -12,7 +12,7 @@ data: DEFAULT_NAMESPACE: {{ .Release.Namespace }} - FRONTEND_ALIAS: {{ .Values.frontend_alias | default "http://browsertrix-cloud-frontend" }} + FRONTEND_ORIGIN: {{ .Values.frontend_alias | default "http://browsertrix-cloud-frontend" }} CRAWLER_FQDN_SUFFIX: ".{{ .Values.crawler_namespace }}.svc.cluster.local" diff --git a/chart/templates/operators.yaml b/chart/templates/operators.yaml index 160e301bde..b7126edb4c 100644 --- a/chart/templates/operators.yaml +++ b/chart/templates/operators.yaml @@ -20,6 +20,11 @@ spec: updateStrategy: method: InPlace + - apiVersion: v1 + resource: configmaps + updateStrategy: + method: OnDelete + hooks: sync: webhook: diff --git a/chart/templates/priorities.yaml b/chart/templates/priorities.yaml index 4b63c15970..9acb5ae8ac 100644 --- a/chart/templates/priorities.yaml +++ b/chart/templates/priorities.yaml @@ -11,13 +11,25 @@ description: "Priority for crawl instance #{{ . }}" {{- end }} +{{- range untilStep 0 (int .Values.max_crawl_scale) 1 }} +--- +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: qa-crawl-instance-{{ . }} +value: -{{ add 100 . }} +globalDefault: false +description: "Priority for QA crawl instance #{{ . }}" + +{{- end }} + # Lower Priority for Background Jobs --- apiVersion: scheduling.k8s.io/v1 kind: PriorityClass metadata: - name: bg-jobs -value: -100 + name: bg-job +value: -1000 globalDefault: false description: "Priority for background jobs" diff --git a/chart/test/test.yaml b/chart/test/test.yaml index 91af1846e2..b867e49713 100644 --- a/chart/test/test.yaml +++ b/chart/test/test.yaml @@ -22,7 +22,7 @@ crawler_channels: image: "docker.io/webrecorder/browsertrix-crawler:latest" - id: test - image: "docker.io/webrecorder/browsertrix-crawler:1.0.0-beta.4" + image: "docker.io/webrecorder/browsertrix-crawler:1.1.0-beta.1" mongo_auth: # specify either username + password (for local mongo) diff --git a/chart/values.yaml b/chart/values.yaml index 46432470e4..f6c4235724 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -17,8 +17,8 @@ disk_utilization_threshold: 90 # crawler logging flags crawler_logging_opts: "stats,behaviors,debug" -# to enable, set to a value other than 'false' -crawler_extract_full_text: false +# to enable, set to one or more comma separate values: to-warc,to-pages,final-to-warc +crawler_extract_full_text: to-warc # max pages per crawl # set to non-zero value to enforce global max pages per crawl limit