Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 105 additions & 13 deletions backend/btrixcloud/colls.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
UserFilePreparer,
MIN_UPLOAD_PART_SIZE,
PublicCollOut,
ResourcesOnly,
)
from .utils import (
dt_now,
Expand All @@ -57,6 +58,8 @@
get_origin,
)

from .crawlmanager import CrawlManager

if TYPE_CHECKING:
from .orgs import OrgOps
from .storages import StorageOps
Expand All @@ -81,8 +84,16 @@ class CollectionOps:
event_webhook_ops: EventWebhookOps
crawl_ops: CrawlOps
page_ops: PageOps
crawl_manager: CrawlManager

def __init__(self, mdb, storage_ops, orgs, event_webhook_ops):
def __init__(
self,
mdb,
orgs: OrgOps,
storage_ops: StorageOps,
crawl_manager: CrawlManager,
event_webhook_ops: EventWebhookOps,
):
self.collections = mdb["collections"]
self.crawls = mdb["crawls"]
self.crawl_configs = mdb["crawl_configs"]
Expand All @@ -91,6 +102,7 @@ def __init__(self, mdb, storage_ops, orgs, event_webhook_ops):

self.orgs = orgs
self.storage_ops = storage_ops
self.crawl_manager = crawl_manager
self.event_webhook_ops = event_webhook_ops

def set_crawl_ops(self, ops):
Expand Down Expand Up @@ -141,11 +153,15 @@ async def add_collection(self, oid: UUID, coll_in: CollIn):
access=coll_in.access,
defaultThumbnailName=coll_in.defaultThumbnailName,
allowPublicDownload=coll_in.allowPublicDownload,
hasDedupeIndex=coll_in.hasDedupeIndex,
)
try:
await self.collections.insert_one(coll.to_dict())
org = await self.orgs.get_org_by_id(oid)
await self.clear_org_previous_slugs_matching_slug(slug, org)
# create collection index
if coll.hasDedupeIndex:
await self.crawl_manager.create_coll_index(coll)

if crawl_ids:
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)
Expand Down Expand Up @@ -194,22 +210,33 @@ async def update_collection(
db_update["$push"] = {"previousSlugs": previous_slug}

try:
result = await self.collections.find_one_and_update(
prev_result = await self.collections.find_one_and_update(
{"_id": coll_id, "oid": org.id},
db_update,
return_document=pymongo.ReturnDocument.AFTER,
return_document=pymongo.ReturnDocument.BEFORE,
)
except pymongo.errors.DuplicateKeyError as err:
# pylint: disable=raise-missing-from
field = get_duplicate_key_error_field(err)
raise HTTPException(status_code=400, detail=f"collection_{field}_taken")

if not result:
if not prev_result:
raise HTTPException(status_code=404, detail="collection_not_found")

if slug_update:
await self.clear_org_previous_slugs_matching_slug(slug_update, org)

# if dedupe index is true, but was false
if update.hasDedupeIndex and not prev_result.get("hasDedupeIndex"):
# get latest coll, create index
coll = await self.get_collection(coll_id, org.id)
await self.crawl_manager.create_coll_index(coll)

# if dedupe is false, but was true
if update.hasDedupeIndex is False and prev_result.get("hasDedupeIndex"):
# delete index -- may need extra restrictions
await self.crawl_manager.delete_coll_index(coll_id)

return {"updated": True}

async def clear_org_previous_slugs_matching_slug(
Expand All @@ -221,6 +248,16 @@ async def clear_org_previous_slugs_matching_slug(
{"$pull": {"previousSlugs": slug}},
)

async def get_coll_dedupe_index(self, coll_id: UUID) -> bool:
"""return true/false if collection has dedupe index, or raise"""
result = await self.collections.find_one(
{"_id": coll_id}, projection=["hasDedupeIndex"]
)
if not result:
raise HTTPException(status_code=404, detail="collection_not_found")

return result["hasDedupeIndex"] is True

async def add_crawls_to_collection(
self,
coll_id: UUID,
Expand All @@ -229,8 +266,6 @@ async def add_crawls_to_collection(
headers: Optional[dict] = None,
) -> CollOut:
"""Add crawls to collection"""
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)

modified = dt_now()
result = await self.collections.find_one_and_update(
{"_id": coll_id},
Expand All @@ -240,8 +275,11 @@ async def add_crawls_to_collection(
if not result:
raise HTTPException(status_code=404, detail="collection_not_found")

# do this after checking if collection exists
await self.crawl_ops.add_to_collection(crawl_ids, coll_id, org)

await self.update_collection_counts_and_tags(coll_id)
await self.update_collection_dates(coll_id, org.id)
await self.update_collection_dates(coll_id, org.id, update_index=True)

asyncio.create_task(
self.event_webhook_ops.create_added_to_collection_notification(
Expand Down Expand Up @@ -270,7 +308,7 @@ async def remove_crawls_from_collection(
raise HTTPException(status_code=404, detail="collection_not_found")

await self.update_collection_counts_and_tags(coll_id)
await self.update_collection_dates(coll_id, org.id)
await self.update_collection_dates(coll_id, org.id, update_index=True)

asyncio.create_task(
self.event_webhook_ops.create_removed_from_collection_notification(
Expand All @@ -294,6 +332,24 @@ async def get_collection_raw(

return result

async def enable_dedupe_index(self, coll_id: UUID):
"""enable dedupe index if it doesn't exist yet"""
result = await self.collections.find_one_and_update(
{"_id": coll_id, "hasDedupeIndex": {"$ne": True}},
{"$set": {"hasDedupeIndex": True}},
return_document=pymongo.ReturnDocument.AFTER,
)

# not changed, nothing to do
if not result:
return False

coll = Collection.from_dict(result)

await self.crawl_manager.create_coll_index(coll)

return True

async def get_collection_raw_by_slug(
self,
coll_slug: str,
Expand Down Expand Up @@ -396,6 +452,16 @@ async def get_collection_out(

return CollOut.from_dict(result)

async def get_internal_replay_list(self, coll_id: UUID, oid: UUID) -> ResourcesOnly:
"""get list of internally resolved signed WACZ files"""
org = await self.orgs.get_org_by_id(oid)
resources, _, _ = await self.get_collection_crawl_resources(coll_id, org)

for file_ in resources:
file_.path = self.storage_ops.resolve_internal_access_path(file_.path)

return ResourcesOnly(resources=resources)

async def get_public_collection_out(
self,
coll_id: UUID,
Expand Down Expand Up @@ -471,6 +537,7 @@ async def list_collections(
sort_direction: int = 1,
name: Optional[str] = None,
name_prefix: Optional[str] = None,
has_dedupe_index: Optional[bool] = None,
access: Optional[str] = None,
headers: Optional[dict] = None,
):
Expand All @@ -488,6 +555,9 @@ async def list_collections(
regex_pattern = f"^{name_prefix}"
match_query["name"] = {"$regex": regex_pattern, "$options": "i"}

if has_dedupe_index is not None:
match_query["hasDedupeIndex"] = has_dedupe_index

if public_colls_out:
match_query["access"] = CollAccessType.PUBLIC
elif access:
Expand Down Expand Up @@ -639,6 +709,9 @@ async def delete_collection(self, coll_id: UUID, org: Organization):
if coll.thumbnail:
await self.delete_thumbnail(coll_id, org)

if coll.hasDedupeIndex:
await self.crawl_manager.delete_coll_index(coll.id)

result = await self.collections.delete_one({"_id": coll_id, "oid": org.id})
if result.deleted_count < 1:
raise HTTPException(status_code=404, detail="collection_not_found")
Expand Down Expand Up @@ -740,7 +813,9 @@ async def update_collection_counts_and_tags(self, collection_id: UUID):
},
)

async def update_collection_dates(self, coll_id: UUID, oid: UUID):
async def update_collection_dates(
self, coll_id: UUID, oid: UUID, update_index=False
):
"""Update collection earliest and latest dates from page timestamps"""
# pylint: disable=too-many-locals
coll = await self.get_collection(coll_id, oid)
Expand All @@ -749,6 +824,10 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID):
earliest_ts = None
latest_ts = None

# update_index is set, update dedupe index if it exists
if update_index and coll.hasDedupeIndex:
await self.crawl_manager.update_coll_index(coll_id)

match_query = {
"oid": coll.oid,
"crawl_id": {"$in": crawl_ids},
Expand Down Expand Up @@ -783,13 +862,16 @@ async def update_collection_dates(self, coll_id: UUID, oid: UUID):

async def update_crawl_collections(self, crawl_id: str, oid: UUID):
"""Update counts, dates, and modified for all collections in crawl"""
# accessing directly to handle both crawls and uploads
crawl = await self.crawls.find_one({"_id": crawl_id})
crawl_coll_ids = crawl.get("collectionIds")
crawl_coll_ids = crawl.get("collectionIds") or []
modified = dt_now()

for coll_id in crawl_coll_ids:
await self.update_collection_counts_and_tags(coll_id)
await self.update_collection_dates(coll_id, oid)
await self.update_collection_dates(
coll_id, oid, crawl.get("dedupeCollId") != coll_id
)
await self.collections.find_one_and_update(
{"_id": coll_id},
{"$set": {"modified": modified}},
Expand Down Expand Up @@ -1000,12 +1082,20 @@ async def calculate_thumbnail_storage(self, oid: UUID) -> int:
# ============================================================================
# pylint: disable=too-many-locals
def init_collections_api(
app, mdb, orgs, storage_ops, event_webhook_ops, user_dep
app,
mdb,
orgs: OrgOps,
storage_ops: StorageOps,
crawl_manager: CrawlManager,
event_webhook_ops: EventWebhookOps,
user_dep,
) -> CollectionOps:
"""init collections api"""
# pylint: disable=invalid-name, unused-argument, too-many-arguments

colls: CollectionOps = CollectionOps(mdb, storage_ops, orgs, event_webhook_ops)
colls: CollectionOps = CollectionOps(
mdb, orgs, storage_ops, crawl_manager, event_webhook_ops
)

org_crawl_dep = orgs.org_crawl_dep
org_viewer_dep = orgs.org_viewer_dep
Expand Down Expand Up @@ -1035,6 +1125,7 @@ async def list_collection_all(
sortDirection: int = 1,
name: Optional[str] = None,
namePrefix: Optional[str] = None,
hasDedupeIndex: Optional[bool] = None,
access: Optional[str] = None,
):
# pylint: disable=duplicate-code
Expand All @@ -1046,6 +1137,7 @@ async def list_collection_all(
sort_direction=sortDirection,
name=name,
name_prefix=namePrefix,
has_dedupe_index=hasDedupeIndex,
access=access,
headers=dict(request.headers),
)
Expand Down
Loading
Loading