Skip to content

Commit

Permalink
Add endpoints to read pages from older crawl WACZs into database (#1562)
Browse files Browse the repository at this point in the history
Fixes #1597

New endpoints (replacing old migration) to re-add crawl pages to db from
WACZs.

After a few implementation attempts, we settled on using
[remotezip](https://github.com/gtsystem/python-remotezip) to handle
parsing of the zip files and streaming their contents line-by-line for
pages. I've also modified the sync log streaming to use remotezip as
well, which allows us to remove our own zip module and let remotezip
handle the complexity of parsing zip files.

Database inserts for pages from WACZs are batched 100 at a time to help
speed up the endpoint, and the task is kicked off using
asyncio.create_task so as not to block before giving a response.

StorageOps now contains a method for streaming the bytes of any file in
a remote WACZ, requiring only the presigned URL for the WACZ and the
name of the file to stream.
  • Loading branch information
tw4l committed Mar 19, 2024
1 parent 2c44011 commit 21ae383
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 345 deletions.
15 changes: 2 additions & 13 deletions backend/btrixcloud/crawls.py
Expand Up @@ -222,16 +222,6 @@ async def delete_crawls(

return {"deleted": True, "storageQuotaReached": quota_reached}

async def get_wacz_files(self, crawl_id: str, org: Organization):
"""Return list of WACZ files associated with crawl."""
wacz_files = []
crawl_raw = await self.get_crawl_raw(crawl_id, org)
crawl = Crawl.from_dict(crawl_raw)
for file_ in crawl.files:
if file_.filename.endswith(".wacz"):
wacz_files.append(file_)
return wacz_files

# pylint: disable=too-many-arguments
async def add_new_crawl(
self,
Expand Down Expand Up @@ -889,11 +879,10 @@ async def stream_crawl_logs(
if context:
contexts = context.split(",")

# If crawl is finished, stream logs from WACZ files
# If crawl is finished, stream logs from WACZ files using presigned urls
if crawl.finished:
wacz_files = await ops.get_wacz_files(crawl_id, org)
resp = await ops.storage_ops.sync_stream_wacz_logs(
org, wacz_files, log_levels, contexts
crawl.resources, log_levels, contexts
)
return StreamingResponse(
resp,
Expand Down
117 changes: 92 additions & 25 deletions backend/btrixcloud/pages.py
@@ -1,5 +1,7 @@
"""crawl pages"""

import asyncio
import traceback
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Tuple, List, Dict, Any, Union
from uuid import UUID, uuid4
Expand Down Expand Up @@ -42,51 +44,83 @@ class PageOps:

def __init__(self, mdb, crawl_ops, org_ops, storage_ops):
self.pages = mdb["pages"]
self.crawls = mdb["crawls"]
self.crawl_ops = crawl_ops
self.org_ops = org_ops
self.storage_ops = storage_ops

async def add_crawl_pages_to_db_from_wacz(self, crawl_id: str):
async def add_crawl_pages_to_db_from_wacz(self, crawl_id: str, batch_size=100):
"""Add pages to database from WACZ files"""
pages_buffer: List[Page] = []
try:
crawl = await self.crawl_ops.get_crawl(crawl_id, None)
org = await self.org_ops.get_org_by_id(crawl.oid)
wacz_files = await self.crawl_ops.get_wacz_files(crawl_id, org)
stream = await self.storage_ops.sync_stream_pages_from_wacz(org, wacz_files)
stream = await self.storage_ops.sync_stream_wacz_pages(crawl.resources)
for page_dict in stream:
if not page_dict.get("url"):
continue

await self.add_page_to_db(page_dict, crawl_id, crawl.oid)
if len(pages_buffer) > batch_size:
await self._add_pages_to_db(pages_buffer)

pages_buffer.append(
self._get_page_from_dict(page_dict, crawl_id, crawl.oid)
)

# Add any remaining pages in buffer to db
if pages_buffer:
await self._add_pages_to_db(pages_buffer)

print(f"Added pages for crawl {crawl_id} to db", flush=True)
# pylint: disable=broad-exception-caught, raise-missing-from
except Exception as err:
traceback.print_exc()
print(f"Error adding pages for crawl {crawl_id} to db: {err}", flush=True)

async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID):
"""Add page to database"""
def _get_page_from_dict(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID):
"""Return Page object from dict"""
page_id = page_dict.get("id")
if not page_id:
print(f'Page {page_dict.get("url")} has no id - assigning UUID', flush=True)
page_id = uuid4()

status = page_dict.get("status")
if not status and page_dict.get("loadState"):
status = 200

return Page(
id=page_id,
oid=oid,
crawl_id=crawl_id,
url=page_dict.get("url"),
title=page_dict.get("title"),
load_state=page_dict.get("loadState"),
status=status,
timestamp=(
from_k8s_date(page_dict.get("ts"))
if page_dict.get("ts")
else datetime.now()
),
)

async def _add_pages_to_db(self, pages: List[Page]):
"""Add batch of pages to db in one insert"""
result = await self.pages.insert_many(
[
page.to_dict(
exclude_unset=True, exclude_none=True, exclude_defaults=True
)
for page in pages
]
)
if not result.inserted_ids:
# pylint: disable=broad-exception-raised
raise Exception("No pages inserted")

async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UUID):
"""Add page to database"""
page = self._get_page_from_dict(page_dict, crawl_id, oid)

try:
status = page_dict.get("status")
if not status and page_dict.get("loadState"):
status = 200
page = Page(
id=page_id,
oid=oid,
crawl_id=crawl_id,
url=page_dict.get("url"),
title=page_dict.get("title"),
load_state=page_dict.get("loadState"),
status=status,
timestamp=(
from_k8s_date(page_dict.get("ts"))
if page_dict.get("ts")
else datetime.now()
),
)
await self.pages.insert_one(
page.to_dict(
exclude_unset=True, exclude_none=True, exclude_defaults=True
Expand All @@ -97,7 +131,7 @@ async def add_page_to_db(self, page_dict: Dict[str, Any], crawl_id: str, oid: UU
# pylint: disable=broad-except
except Exception as err:
print(
f"Error adding page {page_id} from crawl {crawl_id} to db: {err}",
f"Error adding page {page.id} from crawl {crawl_id} to db: {err}",
flush=True,
)

Expand Down Expand Up @@ -360,6 +394,20 @@ async def list_pages(

return pages, total

async def re_add_crawl_pages(self, crawl_id: str, oid: UUID):
"""Delete existing pages for crawl and re-add from WACZs."""
await self.delete_crawl_pages(crawl_id, oid)
print(f"Deleted pages for crawl {crawl_id}", flush=True)
await self.add_crawl_pages_to_db_from_wacz(crawl_id)

async def re_add_all_crawl_pages(self, oid: UUID):
"""Re-add pages for all crawls in org"""
crawl_ids = await self.crawls.distinct(
"_id", {"type": "crawl", "finished": {"$ne": None}}
)
for crawl_id in crawl_ids:
await self.re_add_crawl_pages(crawl_id, oid)


# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
Expand All @@ -371,6 +419,25 @@ def init_pages_api(app, mdb, crawl_ops, org_ops, storage_ops, user_dep):

org_crawl_dep = org_ops.org_crawl_dep

@app.post("/orgs/{oid}/crawls/all/pages/reAdd", tags=["pages"])
async def re_add_all_crawl_pages(
org: Organization = Depends(org_crawl_dep), user: User = Depends(user_dep)
):
"""Re-add pages for all crawls in org (superuser only)"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

asyncio.create_task(ops.re_add_all_crawl_pages(org.id))
return {"started": True}

@app.post("/orgs/{oid}/crawls/{crawl_id}/pages/reAdd", tags=["pages"])
async def re_add_crawl_pages(
crawl_id: str, org: Organization = Depends(org_crawl_dep)
):
"""Re-add pages for crawl"""
asyncio.create_task(ops.re_add_crawl_pages(crawl_id, org.id))
return {"started": True}

@app.get(
"/orgs/{oid}/crawls/{crawl_id}/pages/{page_id}",
tags=["pages"],
Expand Down

0 comments on commit 21ae383

Please sign in to comment.