@@ -12,6 +12,7 @@ use etl::types::EventType;
1212use etl:: types:: PipelineId ;
1313use etl_telemetry:: tracing:: init_test_tracing;
1414use rand:: random;
15+ use std:: time:: Duration ;
1516
1617/// Tests that initial COPY replicates all rows from a partitioned table.
1718/// Only the parent table is tracked, not individual child partitions.
@@ -66,9 +67,14 @@ async fn partitioned_table_copy_replicates_existing_data() {
6667
6768 pipeline. start ( ) . await . unwrap ( ) ;
6869
69- parent_sync_done. notified ( ) . await ;
70+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
71+ . await
72+ . unwrap ( ) ;
7073
71- let _ = pipeline. shutdown_and_wait ( ) . await ;
74+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
75+ . await
76+ . unwrap ( )
77+ . unwrap ( ) ;
7278
7379 let table_rows = destination. get_table_rows ( ) . await ;
7480 let total_rows: usize = table_rows. values ( ) . map ( |rows| rows. len ( ) ) . sum ( ) ;
@@ -150,7 +156,9 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() {
150156
151157 pipeline. start ( ) . await . unwrap ( ) ;
152158
153- parent_sync_done. notified ( ) . await ;
159+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
160+ . await
161+ . unwrap ( ) ;
154162
155163 let new_partition_name = format ! ( "{}_{}" , table_name. name, "p3" ) ;
156164 let new_partition_qualified_name = format ! ( "{}.{}" , table_name. schema, new_partition_name) ;
@@ -175,9 +183,14 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() {
175183 let inserts_notify = destination
176184 . wait_for_events_count ( vec ! [ ( EventType :: Insert , 1 ) ] )
177185 . await ;
178- inserts_notify. notified ( ) . await ;
186+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , inserts_notify. notified ( ) )
187+ . await
188+ . unwrap ( ) ;
179189
180- let _ = pipeline. shutdown_and_wait ( ) . await ;
190+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
191+ . await
192+ . unwrap ( )
193+ . unwrap ( ) ;
181194
182195 let table_rows = destination. get_table_rows ( ) . await ;
183196 let total_rows: usize = table_rows. values ( ) . map ( |rows| rows. len ( ) ) . sum ( ) ;
@@ -253,7 +266,9 @@ async fn partition_drop_does_not_emit_delete_or_truncate() {
253266 ) ;
254267
255268 pipeline. start ( ) . await . unwrap ( ) ;
256- parent_sync_done. notified ( ) . await ;
269+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
270+ . await
271+ . unwrap ( ) ;
257272
258273 let events_before = destination. get_events ( ) . await ;
259274 let grouped_before = group_events_by_type_and_table_id ( & events_before) ;
@@ -282,7 +297,10 @@ async fn partition_drop_does_not_emit_delete_or_truncate() {
282297 . await
283298 . unwrap ( ) ;
284299
285- let _ = pipeline. shutdown_and_wait ( ) . await ;
300+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
301+ . await
302+ . unwrap ( )
303+ . unwrap ( ) ;
286304
287305 let events_after = destination. get_events ( ) . await ;
288306 let grouped_after = group_events_by_type_and_table_id ( & events_after) ;
@@ -357,7 +375,9 @@ async fn partition_detach_with_explicit_publication_does_not_replicate_detached_
357375 ) ;
358376
359377 pipeline. start ( ) . await . unwrap ( ) ;
360- parent_sync_done. notified ( ) . await ;
378+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
379+ . await
380+ . unwrap ( ) ;
361381
362382 // Verify initial sync copied both rows.
363383 let table_rows = destination. get_table_rows ( ) . await ;
@@ -404,9 +424,14 @@ async fn partition_detach_with_explicit_publication_does_not_replicate_detached_
404424 let inserts_notify = destination
405425 . wait_for_events_count ( vec ! [ ( EventType :: Insert , 1 ) ] )
406426 . await ;
407- inserts_notify. notified ( ) . await ;
427+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , inserts_notify. notified ( ) )
428+ . await
429+ . unwrap ( ) ;
408430
409- let _ = pipeline. shutdown_and_wait ( ) . await ;
431+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
432+ . await
433+ . unwrap ( )
434+ . unwrap ( ) ;
410435
411436 // Verify events
412437 let events = destination. get_events ( ) . await ;
@@ -487,7 +512,9 @@ async fn partition_detach_with_all_tables_publication_does_not_replicate_detache
487512 ) ;
488513
489514 pipeline. start ( ) . await . unwrap ( ) ;
490- parent_sync_done. notified ( ) . await ;
515+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
516+ . await
517+ . unwrap ( ) ;
491518
492519 // Verify the initial state. The parent table is the only table tracked.
493520 let table_states_before = state_store. get_table_replication_states ( ) . await ;
@@ -559,7 +586,10 @@ async fn partition_detach_with_all_tables_publication_does_not_replicate_detache
559586 // without re-scanning for new tables. This is expected behavior, the table discovery
560587 // happens at pipeline start or explicit refresh.
561588
562- let _ = pipeline. shutdown_and_wait ( ) . await ;
589+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
590+ . await
591+ . unwrap ( )
592+ . unwrap ( ) ;
563593
564594 // The pipeline state should still only track the parent table (not the detached partition)
565595 // because it hasn't re-scanned for new tables.
@@ -637,7 +667,9 @@ async fn partition_detach_with_all_tables_publication_does_replicate_detached_in
637667 ) ;
638668
639669 pipeline. start ( ) . await . unwrap ( ) ;
640- parent_sync_done. notified ( ) . await ;
670+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
671+ . await
672+ . unwrap ( ) ;
641673
642674 // Verify the initial state. The parent table is the only table tracked.
643675 let table_states_before = state_store. get_table_replication_states ( ) . await ;
@@ -671,7 +703,10 @@ async fn partition_detach_with_all_tables_publication_does_replicate_detached_in
671703 . unwrap ( ) ;
672704
673705 // Shutdown the pipeline.
674- let _ = pipeline. shutdown_and_wait ( ) . await ;
706+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
707+ . await
708+ . unwrap ( )
709+ . unwrap ( ) ;
675710
676711 // Restart the pipeline. It should now discover the detached partition as a new table.
677712 let detached_sync_done = state_store
@@ -689,9 +724,14 @@ async fn partition_detach_with_all_tables_publication_does_replicate_detached_in
689724 pipeline. start ( ) . await . unwrap ( ) ;
690725
691726 // Wait for the detached partition to be synced.
692- detached_sync_done. notified ( ) . await ;
727+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , detached_sync_done. notified ( ) )
728+ . await
729+ . unwrap ( ) ;
693730
694- let _ = pipeline. shutdown_and_wait ( ) . await ;
731+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
732+ . await
733+ . unwrap ( )
734+ . unwrap ( ) ;
695735
696736 // Verify the detached partition was discovered and synced.
697737 let table_states_after = state_store. get_table_replication_states ( ) . await ;
@@ -780,7 +820,9 @@ async fn partition_detach_with_schema_publication_does_not_replicate_detached_in
780820 ) ;
781821
782822 pipeline. start ( ) . await . unwrap ( ) ;
783- parent_sync_done. notified ( ) . await ;
823+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
824+ . await
825+ . unwrap ( ) ;
784826
785827 // Verify initial state.
786828 let table_states_before = state_store. get_table_replication_states ( ) . await ;
@@ -844,9 +886,14 @@ async fn partition_detach_with_schema_publication_does_not_replicate_detached_in
844886 let inserts_notify = destination
845887 . wait_for_events_count ( vec ! [ ( EventType :: Insert , 1 ) ] )
846888 . await ;
847- inserts_notify. notified ( ) . await ;
889+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , inserts_notify. notified ( ) )
890+ . await
891+ . unwrap ( ) ;
848892
849- let _ = pipeline. shutdown_and_wait ( ) . await ;
893+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
894+ . await
895+ . unwrap ( )
896+ . unwrap ( ) ;
850897
851898 // The pipeline state should still only track the parent table.
852899 let table_states_after = state_store. get_table_replication_states ( ) . await ;
@@ -943,7 +990,9 @@ async fn partition_detach_with_schema_publication_does_replicate_detached_insert
943990 ) ;
944991
945992 pipeline. start ( ) . await . unwrap ( ) ;
946- parent_sync_done. notified ( ) . await ;
993+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , parent_sync_done. notified ( ) )
994+ . await
995+ . unwrap ( ) ;
947996
948997 // Verify initial state.
949998 let table_states_before = state_store. get_table_replication_states ( ) . await ;
@@ -977,7 +1026,10 @@ async fn partition_detach_with_schema_publication_does_replicate_detached_insert
9771026 . unwrap ( ) ;
9781027
9791028 // Shutdown the pipeline.
980- let _ = pipeline. shutdown_and_wait ( ) . await ;
1029+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
1030+ . await
1031+ . unwrap ( )
1032+ . unwrap ( ) ;
9811033
9821034 // Restart the pipeline. It should now discover the detached partition as a new table.
9831035 let detached_sync_done = state_store
@@ -995,9 +1047,14 @@ async fn partition_detach_with_schema_publication_does_replicate_detached_insert
9951047 pipeline. start ( ) . await . unwrap ( ) ;
9961048
9971049 // Wait for the detached partition to be synced.
998- detached_sync_done. notified ( ) . await ;
1050+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , detached_sync_done. notified ( ) )
1051+ . await
1052+ . unwrap ( ) ;
9991053
1000- let _ = pipeline. shutdown_and_wait ( ) . await ;
1054+ tokio:: time:: timeout ( Duration :: from_secs ( 3 ) , pipeline. shutdown_and_wait ( ) )
1055+ . await
1056+ . unwrap ( )
1057+ . unwrap ( ) ;
10011058
10021059 // Verify the detached partition was discovered and synced.
10031060 let table_states_after = state_store. get_table_replication_states ( ) . await ;
0 commit comments