File tree Expand file tree Collapse file tree 12 files changed +105
-5
lines changed
scheduled/impl/sync_to_ds/impl Expand file tree Collapse file tree 12 files changed +105
-5
lines changed Original file line number Diff line number Diff line change 1+ """Add task log
2+
3+ Revision ID: 88ac26c3b025
4+ Revises: de0305465e2c
5+ Create Date: 2025-11-16 11:30:25.742630
6+
7+ """
8+ from typing import Sequence , Union
9+
10+ from alembic import op
11+ import sqlalchemy as sa
12+
13+ from src .util .alembic_helpers import task_id_column , created_at_column
14+
15+ # revision identifiers, used by Alembic.
16+ revision : str = '88ac26c3b025'
17+ down_revision : Union [str , None ] = 'de0305465e2c'
18+ branch_labels : Union [str , Sequence [str ], None ] = None
19+ depends_on : Union [str , Sequence [str ], None ] = None
20+
21+
22+ def upgrade () -> None :
23+ op .create_table (
24+ "tasks__log" ,
25+ task_id_column (),
26+ sa .Column (
27+ "log" ,
28+ sa .Text ,
29+ nullable = False ,
30+ ),
31+ created_at_column (),
32+ sa .PrimaryKeyConstraint ("task_id" ),
33+ )
34+
35+
36+ def downgrade () -> None :
37+ pass
Original file line number Diff line number Diff line change 88from src .db .client .async_ import AsyncDatabaseClient
99from src .db .enums import TaskType
1010from src .db .models .impl .task .enums import TaskStatus
11+ from src .db .models .impl .task .log import TaskLog
1112from src .db .models .impl .url .task_error .pydantic_ .insert import URLTaskErrorPydantic
1213from src .db .models .impl .url .task_error .pydantic_ .small import URLTaskErrorSmall
1314from src .db .queries .base .builder import QueryBuilderBase
@@ -94,6 +95,16 @@ async def add_task_errors(
9495 ]
9596 await self .adb_client .bulk_insert (inserts )
9697
98+ async def add_task_log (
99+ self ,
100+ log : str
101+ ) -> None :
102+ task_log = TaskLog (
103+ task_id = self .task_id ,
104+ log = log
105+ )
106+ await self .adb_client .add (task_log )
107+
97108 # Convenience forwarder functions
98109 async def run_query_builder (self , query_builder : QueryBuilderBase ) -> Any :
99110 return await self .adb_client .run_query_builder (query_builder )
Original file line number Diff line number Diff line change @@ -25,6 +25,8 @@ async def meets_task_prerequisites(self) -> bool:
2525
2626 async def inner_task_logic (self ) -> None :
2727 request : AddAgenciesOuterRequest = await self .get_request_input ()
28+ db_ids : list [int ] = [r .request_id for r in request .agencies ]
29+ await self .add_task_log (f"Adding agencies with the following db_ids: { db_ids } " )
2830 responses : list [DSAppSyncAddResponseInnerModel ] = await self .make_request (request )
2931 await self .insert_ds_app_links (responses )
3032
Original file line number Diff line number Diff line change @@ -26,10 +26,14 @@ async def meets_task_prerequisites(self) -> bool:
2626
2727 async def inner_task_logic (self ) -> None :
2828 ds_app_ids : list [int ] = await self .get_inputs ()
29+ await self .log_ds_app_ids (ds_app_ids )
2930 await self .make_request (ds_app_ids )
3031 await self .delete_flags (ds_app_ids )
3132 await self .delete_links (ds_app_ids )
3233
34+ async def log_ds_app_ids (self , ds_app_ids : list [int ]):
35+ await self .add_task_log (f"Deleting agencies with the following ds_app_ids: { ds_app_ids } " )
36+
3337 async def get_inputs (self ) -> list [int ]:
3438 return await self .adb_client .run_query_builder (
3539 DSAppSyncAgenciesDeleteGetQueryBuilder ()
Original file line number Diff line number Diff line change @@ -25,13 +25,17 @@ async def meets_task_prerequisites(self) -> bool:
2525
2626 async def inner_task_logic (self ) -> None :
2727 request : UpdateAgenciesOuterRequest = await self .get_inputs ()
28- await self .make_request (request )
2928 ds_app_ids : list [int ] = [
3029 agency .app_id
3130 for agency in request .agencies
3231 ]
32+ await self .log_ds_app_ids (ds_app_ids )
33+ await self .make_request (request )
3334 await self .update_links (ds_app_ids )
3435
36+ async def log_ds_app_ids (self , ds_app_ids : list [int ]):
37+ await self .add_task_log (f"Updating agencies with the following ds_app_ids: { ds_app_ids } " )
38+
3539 async def get_inputs (self ) -> UpdateAgenciesOuterRequest :
3640 return await self .adb_client .run_query_builder (
3741 DSAppSyncAgenciesUpdateGetQueryBuilder ()
Original file line number Diff line number Diff line change 77from src .core .tasks .scheduled .impl .sync_to_ds .templates .operator import DSSyncTaskOperatorBase
88from src .db .enums import TaskType
99from src .external .pdap .impl .sync .data_sources .add .core import AddDataSourcesRequestBuilder
10- from src .external .pdap .impl .sync .data_sources .add .request import AddDataSourcesOuterRequest
10+ from src .external .pdap .impl .sync .data_sources .add .request import AddDataSourcesOuterRequest , AddDataSourcesInnerRequest
1111from src .external .pdap .impl .sync .shared .models .add .response import DSAppSyncAddResponseInnerModel
1212
1313
@@ -27,9 +27,13 @@ async def meets_task_prerequisites(self) -> bool:
2727
2828 async def inner_task_logic (self ) -> None :
2929 request : AddDataSourcesOuterRequest = await self .get_request_input ()
30+ await self .log_db_ids (request .data_sources )
3031 responses : list [DSAppSyncAddResponseInnerModel ] = await self .make_request (request )
3132 await self .insert_ds_app_links (responses )
3233
34+ async def log_db_ids (self , data_sources : list [AddDataSourcesInnerRequest ]):
35+ db_ids : list [int ] = [d .request_id for d in data_sources ]
36+ await self .add_task_log (f"Adding data sources with the following db_ids: { db_ids } " )
3337
3438 async def get_request_input (self ) -> AddDataSourcesOuterRequest :
3539 return await self .run_query_builder (
Original file line number Diff line number Diff line change @@ -26,10 +26,14 @@ async def meets_task_prerequisites(self) -> bool:
2626
2727 async def inner_task_logic (self ) -> None :
2828 ds_app_ids : list [int ] = await self .get_inputs ()
29+ await self .log_ds_app_ids (ds_app_ids )
2930 await self .make_request (ds_app_ids )
3031 await self .delete_flags (ds_app_ids )
3132 await self .delete_links (ds_app_ids )
3233
34+ async def log_ds_app_ids (self , ds_app_ids : list [int ]):
35+ await self .add_task_log (f"Deleting data sources with the following ds_app_ids: { ds_app_ids } " )
36+
3337 async def get_inputs (self ) -> list [int ]:
3438 return await self .run_query_builder (
3539 DSAppSyncDataSourcesDeleteGetQueryBuilder ()
Original file line number Diff line number Diff line change @@ -25,13 +25,17 @@ async def meets_task_prerequisites(self) -> bool:
2525
2626 async def inner_task_logic (self ) -> None :
2727 request : UpdateDataSourcesOuterRequest = await self .get_inputs ()
28- await self .make_request (request )
2928 ds_app_ids : list [int ] = [
3029 ds .app_id
3130 for ds in request .data_sources
3231 ]
32+ await self .log_ds_app_ids (ds_app_ids )
33+ await self .make_request (request )
3334 await self .update_links (ds_app_ids )
3435
36+ async def log_ds_app_ids (self , ds_app_ids : list [int ]):
37+ await self .add_task_log (f"Updating data sources with the following ds_app_ids: { ds_app_ids } " )
38+
3539 async def get_inputs (self ) -> UpdateDataSourcesOuterRequest :
3640 return await self .adb_client .run_query_builder (
3741 DSAppSyncDataSourcesUpdateGetQueryBuilder ()
Original file line number Diff line number Diff line change 66from src .core .tasks .scheduled .impl .sync_to_ds .templates .operator import DSSyncTaskOperatorBase
77from src .db .enums import TaskType
88from src .external .pdap .impl .sync .meta_urls .add .core import AddMetaURLsRequestBuilder
9- from src .external .pdap .impl .sync .meta_urls .add .request import AddMetaURLsOuterRequest
9+ from src .external .pdap .impl .sync .meta_urls .add .request import AddMetaURLsOuterRequest , AddMetaURLsInnerRequest
1010from src .external .pdap .impl .sync .shared .models .add .response import DSAppSyncAddResponseInnerModel
1111
1212
@@ -25,9 +25,14 @@ async def meets_task_prerequisites(self) -> bool:
2525
2626 async def inner_task_logic (self ) -> None :
2727 request : AddMetaURLsOuterRequest = await self .get_request_input ()
28+ await self .log_db_ids (request .meta_urls )
2829 responses : list [DSAppSyncAddResponseInnerModel ] = await self .make_request (request )
2930 await self .insert_ds_app_links (responses )
3031
32+ async def log_db_ids (self , meta_urls : list [AddMetaURLsInnerRequest ]):
33+ db_ids : list [int ] = [m .request_id for m in meta_urls ]
34+ await self .add_task_log (f"Adding meta urls with the following db_ids: { db_ids } " )
35+
3136 async def get_request_input (self ) -> AddMetaURLsOuterRequest :
3237 return await self .run_query_builder (
3338 DSAppSyncMetaURLsAddGetQueryBuilder ()
Original file line number Diff line number Diff line change @@ -26,10 +26,14 @@ async def meets_task_prerequisites(self) -> bool:
2626
2727 async def inner_task_logic (self ) -> None :
2828 ds_app_ids : list [int ] = await self .get_inputs ()
29+ await self .log_ds_app_ids (ds_app_ids )
2930 await self .make_request (ds_app_ids )
3031 await self .delete_flags (ds_app_ids )
3132 await self .delete_links (ds_app_ids )
3233
34+ async def log_ds_app_ids (self , ds_app_ids : list [int ]):
35+ await self .add_task_log (f"Deleting meta urls with the following ds_app_ids: { ds_app_ids } " )
36+
3337 async def get_inputs (self ) -> list [int ]:
3438 return await self .run_query_builder (
3539 DSAppSyncMetaURLsDeleteGetQueryBuilder ()
You can’t perform that action at this time.
0 commit comments