Skip to content

Commit 13e92ce

Browse files
committed
Complete pre-test draft
1 parent 8229199 commit 13e92ce

File tree

93 files changed

+1434
-1066
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1434
-1066
lines changed

ENV.md

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,28 +57,28 @@ Note that some tasks/subtasks are themselves enabled by other tasks.
5757

5858
### Scheduled Task Flags
5959

60-
| Flag | Description |
61-
|-------------------------------------|-------------------------------------------------------------------------------|
62-
| `SCHEDULED_TASKS_FLAG` | All scheduled tasks. Disabling disables all other scheduled tasks. |
63-
| `PUSH_TO_HUGGING_FACE_TASK_FLAG` | Pushes data to HuggingFace. |
64-
| `POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG` | Populates the backlog snapshot. |
65-
| `DELETE_OLD_LOGS_TASK_FLAG` | Deletes old logs. |
66-
| `RUN_URL_TASKS_TASK_FLAG` | Runs URL tasks. |
67-
| `IA_PROBE_TASK_FLAG` | Extracts and links Internet Archives metadata to URLs. |
68-
| `IA_SAVE_TASK_FLAG` | Saves URLs to Internet Archives. |
69-
| `MARK_TASK_NEVER_COMPLETED_TASK_FLAG` | Marks tasks that were started but never completed (usually due to a restart). |
70-
| `DELETE_STALE_SCREENSHOTS_TASK_FLAG` | Deletes stale screenshots for URLs already validated. |
71-
| `TASK_CLEANUP_TASK_FLAG` | Cleans up tasks that are no longer needed. |
60+
| Flag | Description |
61+
|----------------------------------------|-------------------------------------------------------------------------------|
62+
| `SCHEDULED_TASKS_FLAG` | All scheduled tasks. Disabling disables all other scheduled tasks. |
63+
| `PUSH_TO_HUGGING_FACE_TASK_FLAG` | Pushes data to HuggingFace. |
64+
| `POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG` | Populates the backlog snapshot. |
65+
| `DELETE_OLD_LOGS_TASK_FLAG` | Deletes old logs. |
66+
| `RUN_URL_TASKS_TASK_FLAG` | Runs URL tasks. |
67+
| `IA_PROBE_TASK_FLAG` | Extracts and links Internet Archives metadata to URLs. |
68+
| `IA_SAVE_TASK_FLAG` | Saves URLs to Internet Archives. |
69+
| `MARK_TASK_NEVER_COMPLETED_TASK_FLAG` | Marks tasks that were started but never completed (usually due to a restart). |
70+
| `DELETE_STALE_SCREENSHOTS_TASK_FLAG` | Deletes stale screenshots for URLs already validated. |
71+
| `TASK_CLEANUP_TASK_FLAG` | Cleans up tasks that are no longer needed. |
7272
| `REFRESH_MATERIALIZED_VIEWS_TASK_FLAG` | Refreshes materialized views. |
73-
| `DS_APP_SYNC_AGENCY_ADD_FLAG` | Adds new agencies to the Data Sources App|
74-
| `DS_APP_SYNC_AGENCY_UPDATE_FLAG` | Updates existing agencies in the Data Sources App|
75-
| `DS_APP_SYNC_AGENCY_DELETE_FLAG` | Deletes agencies in the Data Sources App|
76-
| `DS_APP_SYNC_DATA_SOURCE_ADD_FLAG` | Adds new data sources to the Data Sources App|
77-
| `DS_APP_SYNC_DATA_SOURCE_UPDATE_FLAG` | Updates existing data sources in the Data Sources App|
78-
| `DS_APP_SYNC_DATA_SOURCE_DELETE_FLAG` | Deletes data sources in the Data Sources App|
79-
| `DS_APP_SYNC_META_URL_ADD_FLAG` | Adds new meta URLs to the Data Sources App|
80-
| `DS_APP_SYNC_META_URL_UPDATE_FLAG` | Updates existing meta URLs in the Data Sources App|
81-
| `DS_APP_SYNC_META_URL_DELETE_FLAG` | Deletes meta URLs in the Data Sources App|
73+
| `DS_APP_SYNC_AGENCY_ADD_TASK_FLAG` | Adds new agencies to the Data Sources App|
74+
| `DS_APP_SYNC_AGENCY_UPDATE_TASK_FLAG` | Updates existing agencies in the Data Sources App|
75+
| `DS_APP_SYNC_AGENCY_DELETE_TASK_FLAG` | Deletes agencies in the Data Sources App|
76+
| `DS_APP_SYNC_DATA_SOURCE_ADD_TASK_FLAG` | Adds new data sources to the Data Sources App|
77+
| `DS_APP_SYNC_DATA_SOURCE_UPDATE_TASK_FLAG` | Updates existing data sources in the Data Sources App|
78+
| `DS_APP_SYNC_DATA_SOURCE_DELETE_TASK_FLAG` | Deletes data sources in the Data Sources App|
79+
| `DS_APP_SYNC_META_URL_ADD_TASK_FLAG` | Adds new meta URLs to the Data Sources App|
80+
| `DS_APP_SYNC_META_URL_UPDATE_TASK_FLAG` | Updates existing meta URLs in the Data Sources App|
81+
| `DS_APP_SYNC_META_URL_DELETE_TASK_FLAG` | Deletes meta URLs in the Data Sources App|
8282

8383
### URL Task Flags
8484

src/core/tasks/base/operator.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import traceback
22
from abc import ABC, abstractmethod
3+
from typing import Any
34

45
from src.core.enums import BatchStatus
56
from src.core.tasks.base.run_info import TaskOperatorRunInfo
@@ -9,6 +10,7 @@
910
from src.db.models.impl.task.enums import TaskStatus
1011
from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic
1112
from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall
13+
from src.db.queries.base.builder import QueryBuilderBase
1214

1315

1416
class TaskOperatorBase(ABC):
@@ -90,4 +92,8 @@ async def add_task_errors(
9092
)
9193
for error in errors
9294
]
93-
await self.adb_client.bulk_insert(inserts)
95+
await self.adb_client.bulk_insert(inserts)
96+
97+
# Convenience forwarder functions
98+
async def run_query_builder(self, query_builder: QueryBuilderBase) -> Any:
99+
return await self.adb_client.run_query_builder(query_builder)
Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,45 @@
1+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.add.queries.add_links import \
2+
DSAppSyncAgenciesAddInsertLinksQueryBuilder
3+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.add.queries.get import DSAppSyncAgenciesAddGetQueryBuilder
4+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.add.queries.prereq import \
5+
DSAppSyncAgenciesAddPrerequisitesQueryBuilder
16
from src.core.tasks.scheduled.impl.sync_to_ds.templates.operator import DSSyncTaskOperatorBase
7+
from src.external.pdap.impl.sync.agencies.add.core import AddAgenciesRequestBuilder
8+
from src.external.pdap.impl.sync.agencies.add.request import AddAgenciesOuterRequest
9+
from src.external.pdap.impl.sync.shared.models.add.response import DSAppSyncAddResponseInnerModel
210

311

412
class DSAppSyncAgenciesAddTaskOperator(
513
DSSyncTaskOperatorBase
614
):
715

816
async def meets_task_prerequisites(self) -> bool:
9-
raise NotImplementedError
17+
return await self.run_query_builder(
18+
DSAppSyncAgenciesAddPrerequisitesQueryBuilder()
19+
)
1020

1121
async def inner_task_logic(self) -> None:
12-
raise NotImplementedError
22+
request: AddAgenciesOuterRequest = await self.get_request_input()
23+
responses: list[DSAppSyncAddResponseInnerModel] = await self.make_request(request)
24+
await self.insert_ds_app_links(responses)
25+
26+
async def get_request_input(self) -> AddAgenciesOuterRequest:
27+
return await self.run_query_builder(
28+
DSAppSyncAgenciesAddGetQueryBuilder()
29+
)
30+
31+
async def make_request(
32+
self,
33+
request: AddAgenciesOuterRequest
34+
) -> list[DSAppSyncAddResponseInnerModel]:
35+
return await self.pdap_client.run_request_builder(
36+
AddAgenciesRequestBuilder(request)
37+
)
38+
39+
async def insert_ds_app_links(
40+
self,
41+
responses: list[DSAppSyncAddResponseInnerModel]
42+
) -> None:
43+
await self.run_query_builder(
44+
DSAppSyncAgenciesAddInsertLinksQueryBuilder(responses)
45+
)
Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
from sqlalchemy.ext.asyncio import AsyncSession
22

3+
from src.db.models.impl.agency.ds_link.sqlalchemy import DSAppLinkAgency
34
from src.db.queries.base.builder import QueryBuilderBase
4-
from src.external.pdap.impl.sync.shared.models.mapping import DSSyncIDMapping
5+
from src.external.pdap.impl.sync.shared.models.add.response import DSAppSyncAddResponseInnerModel
56

67

78
class DSAppSyncAgenciesAddInsertLinksQueryBuilder(QueryBuilderBase):
89

910
def __init__(
1011
self,
11-
mappings: list[DSSyncIDMapping]
12+
mappings: list[DSAppSyncAddResponseInnerModel]
1213
):
1314
super().__init__()
1415
self._mappings = mappings
1516

1617
async def run(self, session: AsyncSession) -> None:
17-
raise NotImplementedError
18+
inserts: list[DSAppLinkAgency] = []
19+
for mapping in self._mappings:
20+
inserts.append(
21+
DSAppLinkAgency(
22+
ds_agency_id=mapping.ds_app_id,
23+
agency_id=mapping.request_id,
24+
)
25+
)
26+
session.add_all(inserts)
Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,74 @@
1+
from typing import Sequence
2+
3+
from sqlalchemy import select, RowMapping, func
14
from sqlalchemy.ext.asyncio import AsyncSession
25

6+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.add.queries.cte import \
7+
DSAppLinkSyncAgencyAddPrerequisitesCTEContainer
8+
from src.db.models.impl.agency.sqlalchemy import Agency
9+
from src.db.models.impl.link.agency_location.sqlalchemy import LinkAgencyLocation
310
from src.db.queries.base.builder import QueryBuilderBase
4-
from src.external.pdap.impl.sync.agencies.add.request import AddAgenciesOuterRequest
11+
from src.external.pdap.impl.sync.agencies._shared.models.content import AgencySyncContentModel
12+
from src.external.pdap.impl.sync.agencies.add.request import AddAgenciesOuterRequest, AddAgenciesInnerRequest
513

614

715
class DSAppSyncAgenciesAddGetQueryBuilder(QueryBuilderBase):
816

917
async def run(self, session: AsyncSession) -> AddAgenciesOuterRequest:
10-
raise NotImplementedError
18+
cte = DSAppLinkSyncAgencyAddPrerequisitesCTEContainer()
19+
20+
location_id_cte = (
21+
select(
22+
LinkAgencyLocation.agency_id,
23+
func.array_agg(LinkAgencyLocation.location_id).label("location_ids"),
24+
)
25+
.join(
26+
Agency,
27+
Agency.id == cte.agency_id,
28+
)
29+
.group_by(
30+
LinkAgencyLocation.agency_id,
31+
)
32+
.cte()
33+
)
34+
35+
query = (
36+
select(
37+
cte.agency_id,
38+
Agency.name,
39+
Agency.jurisdiction_type,
40+
Agency.agency_type,
41+
location_id_cte.c.location_ids,
42+
)
43+
.join(
44+
Agency,
45+
Agency.id == cte.agency_id,
46+
)
47+
.join(
48+
location_id_cte,
49+
location_id_cte.c.agency_id == cte.agency_id,
50+
)
51+
)
52+
53+
mappings: Sequence[RowMapping] = await self.sh.mappings(
54+
session=session,
55+
query=query,
56+
)
57+
58+
inner_requests: list[AddAgenciesInnerRequest] = []
59+
for mapping in mappings:
60+
inner_requests.append(
61+
AddAgenciesInnerRequest(
62+
request_id=mapping.agency_id,
63+
content=AgencySyncContentModel(
64+
name=mapping[Agency.name],
65+
jurisdiction_type=mapping[Agency.jurisdiction_type],
66+
agency_type=mapping[Agency.agency_type],
67+
location_ids=mapping["location_ids"]
68+
)
69+
)
70+
)
71+
72+
return AddAgenciesOuterRequest(
73+
agencies=inner_requests,
74+
)
Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1+
from sqlalchemy import select
12
from sqlalchemy.ext.asyncio import AsyncSession
23

4+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.add.queries.cte import \
5+
DSAppLinkSyncAgencyAddPrerequisitesCTEContainer
36
from src.db.queries.base.builder import QueryBuilderBase
47

58

69
class DSAppSyncAgenciesAddPrerequisitesQueryBuilder(QueryBuilderBase):
710

811
async def run(self, session: AsyncSession) -> bool:
9-
raise NotImplementedError
12+
return await self.sh.results_exist(
13+
session=session,
14+
query=select(
15+
DSAppLinkSyncAgencyAddPrerequisitesCTEContainer().agency_id
16+
)
17+
)
Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,59 @@
1+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.delete.queries.delete_flags import \
2+
DSAppSyncAgenciesDeleteRemoveFlagsQueryBuilder
3+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.delete.queries.delete_links import \
4+
DSAppSyncAgenciesDeleteRemoveLinksQueryBuilder
5+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.delete.queries.get import \
6+
DSAppSyncAgenciesDeleteGetQueryBuilder
7+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.delete.queries.prereq import \
8+
DSAppSyncAgenciesDeletePrerequisitesQueryBuilder
19
from src.core.tasks.scheduled.impl.sync_to_ds.templates.operator import DSSyncTaskOperatorBase
10+
from src.external.pdap.impl.sync.agencies.delete.core import DeleteAgenciesRequestBuilder
211

312

413
class DSAppSyncAgenciesDeleteTaskOperator(
514
DSSyncTaskOperatorBase
615
):
716

817
async def meets_task_prerequisites(self) -> bool:
9-
raise NotImplementedError
18+
return await self.adb_client.run_query_builder(
19+
DSAppSyncAgenciesDeletePrerequisitesQueryBuilder()
20+
)
1021

1122
async def inner_task_logic(self) -> None:
12-
raise NotImplementedError
23+
ds_app_ids: list[int] = await self.get_inputs()
24+
await self.make_request(ds_app_ids)
25+
await self.delete_flags(ds_app_ids)
26+
await self.delete_links(ds_app_ids)
27+
28+
async def get_inputs(self) -> list[int]:
29+
return await self.adb_client.run_query_builder(
30+
DSAppSyncAgenciesDeleteGetQueryBuilder()
31+
)
32+
33+
async def make_request(
34+
self,
35+
ds_app_ids: list[int]
36+
) -> None:
37+
await self.pdap_client.run_request_builder(
38+
DeleteAgenciesRequestBuilder(ds_app_ids)
39+
)
40+
41+
async def delete_flags(
42+
self,
43+
ds_app_ids: list[int]
44+
) -> None:
45+
await self.run_query_builder(
46+
DSAppSyncAgenciesDeleteRemoveFlagsQueryBuilder(
47+
ds_agency_ids=ds_app_ids
48+
)
49+
)
50+
51+
async def delete_links(
52+
self,
53+
ds_app_ids: list[int]
54+
) -> None:
55+
await self.run_query_builder(
56+
DSAppSyncAgenciesDeleteRemoveLinksQueryBuilder(
57+
ds_agency_ids=ds_app_ids
58+
)
59+
)

src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/queries/delete_flags.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from sqlalchemy import delete
12
from sqlalchemy.ext.asyncio import AsyncSession
23

4+
from src.db.models.impl.flag.ds_delete.agency import FlagDSDeleteAgency
35
from src.db.queries.base.builder import QueryBuilderBase
46

57

@@ -13,4 +15,8 @@ def __init__(
1315
self._ds_agency_ids = ds_agency_ids
1416

1517
async def run(self, session: AsyncSession) -> None:
16-
raise NotImplementedError
18+
statement = (
19+
delete(FlagDSDeleteAgency)
20+
.where(FlagDSDeleteAgency.ds_agency_id.in_(self._ds_agency_ids))
21+
)
22+
await session.execute(statement)

src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/queries/delete_links.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from sqlalchemy import delete
12
from sqlalchemy.ext.asyncio import AsyncSession
23

4+
from src.db.models.impl.agency.ds_link.sqlalchemy import DSAppLinkAgency
35
from src.db.queries.base.builder import QueryBuilderBase
46

57

@@ -13,4 +15,8 @@ def __init__(
1315
self._ds_agency_ids = ds_agency_ids
1416

1517
async def run(self, session: AsyncSession) -> None:
16-
raise NotImplementedError
18+
statement = (
19+
delete(DSAppLinkAgency)
20+
.where(DSAppLinkAgency.ds_agency_id.in_(self._ds_agency_ids))
21+
)
22+
await session.execute(statement)
Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,28 @@
1+
from typing import Sequence
2+
3+
from sqlalchemy import select, RowMapping
14
from sqlalchemy.ext.asyncio import AsyncSession
25

6+
from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.delete.queries.cte import \
7+
DSAppLinkSyncAgencyDeletePrerequisitesCTEContainer
38
from src.db.queries.base.builder import QueryBuilderBase
49

510

611
class DSAppSyncAgenciesDeleteGetQueryBuilder(QueryBuilderBase):
712

813
async def run(self, session: AsyncSession) -> list[int]:
914
"""Get DS App links to delete."""
10-
raise NotImplementedError
15+
cte = DSAppLinkSyncAgencyDeletePrerequisitesCTEContainer()
16+
17+
query = (
18+
select(
19+
cte.ds_agency_id,
20+
)
21+
)
22+
23+
mappings: Sequence[RowMapping] = await self.sh.mappings(
24+
session=session,
25+
query=query,
26+
)
27+
28+
return [mapping[cte.ds_agency_id] for mapping in mappings]

0 commit comments

Comments
 (0)