Skip to content

Commit c134b57

Browse files
authored
Optimize presigning for replay.json (#2516)
Fixes #2515. This PR introduces a significantly optimized logic for presigning URLs for crawls and collections. - For collections, the files needed from all crawls are looked up, and then the 'presign_urls' table is merged in one pass, resulting in a unified iterator containing files and presign urls for those files. - For crawls, the presign URLs are also looked up once, and the same iterator is used for a single crawl with passed in list of CrawlFiles - URLs that are already signed are added to the return list. - For any remaining URLs to be signed, a bulk presigning function is added, which shares an HTTP connection and signing 8 files in parallels (customizable via helm chart, though may not be needed). This function is used to call the presigning API in parallel.
1 parent f1fd11c commit c134b57

File tree

6 files changed

+282
-87
lines changed

6 files changed

+282
-87
lines changed

backend/btrixcloud/basecrawls.py

Lines changed: 129 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
"""base crawl type"""
22

33
from datetime import datetime
4-
from typing import Optional, List, Union, Dict, Any, Type, TYPE_CHECKING, cast, Tuple
4+
from typing import (
5+
Optional,
6+
List,
7+
Union,
8+
Dict,
9+
Any,
10+
Type,
11+
TYPE_CHECKING,
12+
cast,
13+
Tuple,
14+
AsyncIterable,
15+
)
516
from uuid import UUID
617
import os
718
import urllib.parse
@@ -76,6 +87,7 @@ def __init__(
7687
background_job_ops: BackgroundJobOps,
7788
):
7889
self.crawls = mdb["crawls"]
90+
self.presigned_urls = mdb["presigned_urls"]
7991
self.crawl_configs = crawl_configs
8092
self.user_manager = users
8193
self.orgs = orgs
@@ -464,29 +476,130 @@ async def resolve_signed_urls(
464476
) -> List[CrawlFileOut]:
465477
"""Regenerate presigned URLs for files as necessary"""
466478
if not files:
467-
print("no files")
468479
return []
469480

470481
out_files = []
471482

472-
for file_ in files:
473-
presigned_url, expire_at = await self.storage_ops.get_presigned_url(
474-
org, file_, force_update=force_update
483+
cursor = self.presigned_urls.find(
484+
{"_id": {"$in": [file.filename for file in files]}}
485+
)
486+
487+
presigned = await cursor.to_list(10000)
488+
489+
files_dict = [file.dict() for file in files]
490+
491+
# need an async generator to call bulk_presigned_files
492+
async def async_gen():
493+
yield {"presigned": presigned, "files": files_dict, "_id": crawl_id}
494+
495+
out_files, _ = await self.bulk_presigned_files(async_gen(), org, force_update)
496+
497+
return out_files
498+
499+
async def get_presigned_files(
500+
self, match: dict[str, Any], org: Organization
501+
) -> tuple[list[CrawlFileOut], bool]:
502+
"""return presigned crawl files queried as batch, merging presigns with files in one pass"""
503+
cursor = self.crawls.aggregate(
504+
[
505+
{"$match": match},
506+
{"$project": {"files": "$files", "version": 1}},
507+
{
508+
"$lookup": {
509+
"from": "presigned_urls",
510+
"localField": "files.filename",
511+
"foreignField": "_id",
512+
"as": "presigned",
513+
}
514+
},
515+
]
516+
)
517+
518+
return await self.bulk_presigned_files(cursor, org)
519+
520+
async def bulk_presigned_files(
521+
self,
522+
cursor: AsyncIterable[dict[str, Any]],
523+
org: Organization,
524+
force_update=False,
525+
) -> tuple[list[CrawlFileOut], bool]:
526+
"""process presigned files in batches"""
527+
resources = []
528+
pages_optimized = False
529+
530+
sign_files = []
531+
532+
async for result in cursor:
533+
pages_optimized = result.get("version") == 2
534+
535+
mapping = {}
536+
# create mapping of filename -> file data
537+
for file in result["files"]:
538+
file["crawl_id"] = result["_id"]
539+
mapping[file["filename"]] = file
540+
541+
if not force_update:
542+
# add already presigned resources
543+
for presigned in result["presigned"]:
544+
file = mapping.get(presigned["_id"])
545+
if file:
546+
file["signedAt"] = presigned["signedAt"]
547+
file["path"] = presigned["url"]
548+
resources.append(
549+
CrawlFileOut(
550+
name=os.path.basename(file["filename"]),
551+
path=presigned["url"],
552+
hash=file["hash"],
553+
size=file["size"],
554+
crawlId=file["crawl_id"],
555+
numReplicas=len(file.get("replicas") or []),
556+
expireAt=date_to_str(
557+
presigned["signedAt"]
558+
+ self.storage_ops.signed_duration_delta
559+
),
560+
)
561+
)
562+
563+
del mapping[presigned["_id"]]
564+
565+
sign_files.extend(list(mapping.values()))
566+
567+
by_storage: dict[str, dict] = {}
568+
for file in sign_files:
569+
storage_ref = StorageRef(**file.get("storage"))
570+
sid = str(storage_ref)
571+
572+
storage_group = by_storage.get(sid)
573+
if not storage_group:
574+
storage_group = {"ref": storage_ref, "names": [], "files": []}
575+
by_storage[sid] = storage_group
576+
577+
storage_group["names"].append(file["filename"])
578+
storage_group["files"].append(file)
579+
580+
for storage_group in by_storage.values():
581+
s3storage = self.storage_ops.get_org_storage_by_ref(
582+
org, storage_group["ref"]
475583
)
476584

477-
out_files.append(
478-
CrawlFileOut(
479-
name=os.path.basename(file_.filename),
480-
path=presigned_url or "",
481-
hash=file_.hash,
482-
size=file_.size,
483-
crawlId=crawl_id,
484-
numReplicas=len(file_.replicas) if file_.replicas else 0,
485-
expireAt=date_to_str(expire_at),
486-
)
585+
signed_urls, expire_at = await self.storage_ops.get_presigned_urls_bulk(
586+
org, s3storage, storage_group["names"]
487587
)
488588

489-
return out_files
589+
for url, file in zip(signed_urls, storage_group["files"]):
590+
resources.append(
591+
CrawlFileOut(
592+
name=os.path.basename(file["filename"]),
593+
path=url,
594+
hash=file["hash"],
595+
size=file["size"],
596+
crawlId=file["crawl_id"],
597+
numReplicas=len(file.get("replicas") or []),
598+
expireAt=date_to_str(expire_at),
599+
)
600+
)
601+
602+
return resources, pages_optimized
490603

491604
async def add_to_collection(
492605
self, crawl_ids: List[str], collection_id: UUID, org: Organization

backend/btrixcloud/colls.py

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
UpdateColl,
2929
AddRemoveCrawlList,
3030
BaseCrawl,
31-
CrawlOutWithResources,
3231
CrawlFileOut,
3332
Organization,
3433
PaginatedCollOutResponse,
@@ -40,6 +39,7 @@
4039
AddedResponse,
4140
DeletedResponse,
4241
CollectionSearchValuesResponse,
42+
CollectionAllResponse,
4343
OrgPublicCollections,
4444
PublicOrgDetails,
4545
CollAccessType,
@@ -50,7 +50,12 @@
5050
MIN_UPLOAD_PART_SIZE,
5151
PublicCollOut,
5252
)
53-
from .utils import dt_now, slug_from_name, get_duplicate_key_error_field, get_origin
53+
from .utils import (
54+
dt_now,
55+
slug_from_name,
56+
get_duplicate_key_error_field,
57+
get_origin,
58+
)
5459

5560
if TYPE_CHECKING:
5661
from .orgs import OrgOps
@@ -346,7 +351,7 @@ async def get_collection_out(
346351
result["resources"],
347352
crawl_ids,
348353
pages_optimized,
349-
) = await self.get_collection_crawl_resources(coll_id)
354+
) = await self.get_collection_crawl_resources(coll_id, org)
350355

351356
initial_pages, _ = await self.page_ops.list_pages(
352357
crawl_ids=crawl_ids,
@@ -400,7 +405,9 @@ async def get_public_collection_out(
400405
if result.get("access") not in allowed_access:
401406
raise HTTPException(status_code=404, detail="collection_not_found")
402407

403-
result["resources"], _, _ = await self.get_collection_crawl_resources(coll_id)
408+
result["resources"], _, _ = await self.get_collection_crawl_resources(
409+
coll_id, org
410+
)
404411

405412
thumbnail = result.get("thumbnail")
406413
if thumbnail:
@@ -554,32 +561,24 @@ async def list_collections(
554561

555562
return collections, total
556563

564+
# pylint: disable=too-many-locals
557565
async def get_collection_crawl_resources(
558-
self, coll_id: UUID
566+
self, coll_id: Optional[UUID], org: Organization
559567
) -> tuple[List[CrawlFileOut], List[str], bool]:
560568
"""Return pre-signed resources for all collection crawl files."""
561-
# Ensure collection exists
562-
_ = await self.get_collection_raw(coll_id)
569+
match: dict[str, Any]
563570

564-
resources = []
565-
pages_optimized = True
571+
if coll_id:
572+
crawl_ids = await self.get_collection_crawl_ids(coll_id)
573+
match = {"_id": {"$in": crawl_ids}}
574+
else:
575+
crawl_ids = []
576+
match = {"oid": org.id}
566577

567-
crawls, _ = await self.crawl_ops.list_all_base_crawls(
568-
collection_id=coll_id,
569-
states=list(SUCCESSFUL_STATES),
570-
page_size=10_000,
571-
cls_type=CrawlOutWithResources,
578+
resources, pages_optimized = await self.crawl_ops.get_presigned_files(
579+
match, org
572580
)
573581

574-
crawl_ids = []
575-
576-
for crawl in crawls:
577-
crawl_ids.append(crawl.id)
578-
if crawl.resources:
579-
resources.extend(crawl.resources)
580-
if crawl.version != 2:
581-
pages_optimized = False
582-
583582
return resources, crawl_ids, pages_optimized
584583

585584
async def get_collection_names(self, uuids: List[UUID]):
@@ -1009,24 +1008,11 @@ async def list_collection_all(
10091008
@app.get(
10101009
"/orgs/{oid}/collections/$all",
10111010
tags=["collections"],
1012-
response_model=Dict[str, List[CrawlFileOut]],
1011+
response_model=CollectionAllResponse,
10131012
)
10141013
async def get_collection_all(org: Organization = Depends(org_viewer_dep)):
10151014
results = {}
1016-
try:
1017-
all_collections, _ = await colls.list_collections(org, page_size=10_000)
1018-
for collection in all_collections:
1019-
(
1020-
results[collection.name],
1021-
_,
1022-
_,
1023-
) = await colls.get_collection_crawl_resources(collection.id)
1024-
except Exception as exc:
1025-
# pylint: disable=raise-missing-from
1026-
raise HTTPException(
1027-
status_code=400, detail="Error Listing All Crawled Files: " + str(exc)
1028-
)
1029-
1015+
results["resources"] = await colls.get_collection_crawl_resources(None, org)
10301016
return results
10311017

10321018
@app.get(

backend/btrixcloud/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,6 +1598,13 @@ class CollectionSearchValuesResponse(BaseModel):
15981598
names: List[str]
15991599

16001600

1601+
# ============================================================================
1602+
class CollectionAllResponse(BaseModel):
1603+
"""Response model for '$all' collection endpoint"""
1604+
1605+
resources: List[CrawlFileOut] = []
1606+
1607+
16011608
# ============================================================================
16021609

16031610
### ORGS ###

backend/btrixcloud/pages.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,22 @@ def _get_page_from_dict(
191191

192192
async def _add_pages_to_db(self, crawl_id: str, pages: List[Page], ordered=True):
193193
"""Add batch of pages to db in one insert"""
194-
result = await self.pages.insert_many(
195-
[
196-
page.to_dict(
197-
exclude_unset=True, exclude_none=True, exclude_defaults=True
198-
)
199-
for page in pages
200-
],
201-
ordered=ordered,
202-
)
194+
try:
195+
result = await self.pages.insert_many(
196+
[
197+
page.to_dict(
198+
exclude_unset=True, exclude_none=True, exclude_defaults=True
199+
)
200+
for page in pages
201+
],
202+
ordered=ordered,
203+
)
204+
except pymongo.errors.BulkWriteError as bwe:
205+
for err in bwe.details.get("writeErrors", []):
206+
# ignorable duplicate key errors
207+
if err.get("code") != 11000:
208+
raise
209+
203210
if not result.inserted_ids:
204211
# pylint: disable=broad-exception-raised
205212
raise Exception("No pages inserted")

0 commit comments

Comments
 (0)