Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoints to read pages from older crawl WACZs into database #1562

Merged
merged 15 commits into from Mar 19, 2024
15 changes: 2 additions & 13 deletions backend/btrixcloud/crawls.py
Expand Up @@ -222,16 +222,6 @@ async def delete_crawls(

return {"deleted": True, "storageQuotaReached": quota_reached}

async def get_wacz_files(self, crawl_id: str, org: Organization):
"""Return list of WACZ files associated with crawl."""
wacz_files = []
crawl_raw = await self.get_crawl_raw(crawl_id, org)
crawl = Crawl.from_dict(crawl_raw)
for file_ in crawl.files:
if file_.filename.endswith(".wacz"):
wacz_files.append(file_)
return wacz_files

# pylint: disable=too-many-arguments
async def add_new_crawl(
self,
Expand Down Expand Up @@ -889,11 +879,10 @@ async def stream_crawl_logs(
if context:
contexts = context.split(",")

# If crawl is finished, stream logs from WACZ files
# If crawl is finished, stream logs from WACZ files using presigned urls
if crawl.finished:
wacz_files = await ops.get_wacz_files(crawl_id, org)
resp = await ops.storage_ops.sync_stream_wacz_logs(
org, wacz_files, log_levels, contexts
crawl.resources, log_levels, contexts
)
return StreamingResponse(
resp,
Expand Down
117 changes: 92 additions & 25 deletions backend/btrixcloud/pages.py
@@ -1,5 +1,7 @@
"""crawl pages"""

import asyncio
import traceback
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Tuple, List, Dict, Any, Union
from uuid import UUID, uuid4
Expand Down Expand Up @@ -42,51 +44,83 @@ class PageOps:

def __init__(self, mdb, crawl_ops, org_ops, storage_ops):
self.pages = mdb["pages"]
self.crawls = mdb["crawls"]
self.crawl_ops = crawl_ops
self.org_ops = org_ops
self.storage_ops = storage_ops

async def add_crawl_pages_to_db_from_wacz(self, crawl_id: str):
async def add_crawl_pages_to_db_from_wacz(self, crawl_id: str, batch_size=100):
"""Add pages to database from WACZ files"""
pages_buffer: List[Page] = []
try:
crawl = await self.crawl_ops.get_crawl(crawl_id, None)
org = await self.org_ops.get_org_by_id(crawl.oid)
wacz_files = await self.crawl_ops.get_wacz_files(crawl_id, org)
stream = await self.storage_ops.sync_stream_pages_from_wacz(org, wacz_files)
stream = await self.storage_ops.sync_stream_wacz_pages(crawl.resources)
for page_dict in stream:
if not page_dict.get("url"):
continue

await self.add_page_to_db(page_dict, crawl_id, crawl.oid)
if len(pages_buffer) > batch_size:
await self._add_pages_to_db(pages_buffer)

pages_buffer.append(
self._get_page_from_dict(page_dict, crawl_id, crawl.oid)
)

# Add any remaining pages in buffer to db
if pages_buffer:
await self._add_pages_to_db(pages_buffer)

print(f"Added pages for crawl {crawl_id} to db", flush=True)
# pylint: disable=broad-exception-caught, raise-missing-from
except Exception as err:
traceback.print_exc()
print(f"Error adding pages for crawl {crawl_id} to db: {err}", flush=True)

async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID):
"""Add page to database"""
def _get_page_from_dict(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID):
"""Return Page object from dict"""
page_id = page_dict.get("id")
if not page_id:
print(f'Page {page_dict.get("url")} has no id - assigning UUID', flush=True)
page_id = uuid4()

status = page_dict.get("status")
if not status and page_dict.get("loadState"):
status = 200

return Page(
id=page_id,
oid=oid,
crawl_id=crawl_id,
url=page_dict.get("url"),
title=page_dict.get("title"),
load_state=page_dict.get("loadState"),
status=status,
timestamp=(
from_k8s_date(page_dict.get("ts"))
if page_dict.get("ts")
else datetime.now()
),
)

async def _add_pages_to_db(self, pages: List[Page]):
"""Add batch of pages to db in one insert"""
result = await self.pages.insert_many(
[
page.to_dict(
exclude_unset=True, exclude_none=True, exclude_defaults=True
)
for page in pages
]
)
if not result.inserted_ids:
# pylint: disable=broad-exception-raised
raise Exception("No pages inserted")

async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID):
"""Add page to database"""
page = self._get_page_from_dict(page_dict, crawl_id, oid)

try:
status = page_dict.get("status")
if not status and page_dict.get("loadState"):
status = 200
page = Page(
id=page_id,
oid=oid,
crawl_id=crawl_id,
url=page_dict.get("url"),
title=page_dict.get("title"),
load_state=page_dict.get("loadState"),
status=status,
timestamp=(
from_k8s_date(page_dict.get("ts"))
if page_dict.get("ts")
else datetime.now()
),
)
await self.pages.insert_one(
page.to_dict(
exclude_unset=True, exclude_none=True, exclude_defaults=True
Expand All @@ -97,7 +131,7 @@ async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UU
# pylint: disable=broad-except
except Exception as err:
print(
f"Error adding page {page_id} from crawl {crawl_id} to db: {err}",
f"Error adding page {page.id} from crawl {crawl_id} to db: {err}",
flush=True,
)

Expand Down Expand Up @@ -360,6 +394,20 @@ async def list_pages(

return pages, total

async def re_add_crawl_pages(self, crawl_id: str, oid: UUID):
"""Delete existing pages for crawl and re-add from WACZs."""
await self.delete_crawl_pages(crawl_id, oid)
print(f"Deleted pages for crawl {crawl_id}", flush=True)
await self.add_crawl_pages_to_db_from_wacz(crawl_id)

async def re_add_all_crawl_pages(self, oid: UUID):
"""Re-add pages for all crawls in org"""
crawl_ids = await self.crawls.distinct(
"_id", {"type": "crawl", "finished": {"$ne": None}}
)
for crawl_id in crawl_ids:
await self.re_add_crawl_pages(crawl_id, oid)


# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
Expand All @@ -371,6 +419,25 @@ def init_pages_api(app, mdb, crawl_ops, org_ops, storage_ops, user_dep):

org_crawl_dep = org_ops.org_crawl_dep

@app.post("/orgs/{oid}/crawls/all/pages/reAdd", tags=["pages"])
async def re_add_all_crawl_pages(
org: Organization = Depends(org_crawl_dep), user: User = Depends(user_dep)
):
"""Re-add pages for all crawls in org (superuser only)"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

asyncio.create_task(ops.re_add_all_crawl_pages(org.id))
return {"started": True}

@app.post("/orgs/{oid}/crawls/{crawl_id}/pages/reAdd", tags=["pages"])
async def re_add_crawl_pages(
crawl_id: str, org: Organization = Depends(org_crawl_dep)
):
"""Re-add pages for crawl"""
asyncio.create_task(ops.re_add_crawl_pages(crawl_id, org.id))
return {"started": True}

@app.get(
"/orgs/{oid}/crawls/{crawl_id}/pages/{page_id}",
tags=["pages"],
Expand Down