Skip to content

Commit f5e3b11

Browse files
committed
Improve
1 parent 15c34f9 commit f5e3b11

File tree

1 file changed

+13
-39
lines changed

1 file changed

+13
-39
lines changed

etl/tests/pipeline_with_partitioned_table.rs

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@ async fn partitioned_table_copy_replicates_existing_data() {
6767

6868
pipeline.start().await.unwrap();
6969

70-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
71-
.await
72-
.unwrap();
70+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
7371

7472
tokio::time::timeout(Duration::from_secs(3), pipeline.shutdown_and_wait())
7573
.await
@@ -156,9 +154,7 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() {
156154

157155
pipeline.start().await.unwrap();
158156

159-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
160-
.await
161-
.unwrap();
157+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
162158

163159
let new_partition_name = format!("{}_{}", table_name.name, "p3");
164160
let new_partition_qualified_name = format!("{}.{}", table_name.schema, new_partition_name);
@@ -183,9 +179,7 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() {
183179
let inserts_notify = destination
184180
.wait_for_events_count(vec![(EventType::Insert, 1)])
185181
.await;
186-
tokio::time::timeout(Duration::from_secs(3), inserts_notify.notified())
187-
.await
188-
.unwrap();
182+
let _ = tokio::time::timeout(Duration::from_secs(3), inserts_notify.notified()).await;
189183

190184
tokio::time::timeout(Duration::from_secs(3), pipeline.shutdown_and_wait())
191185
.await
@@ -266,9 +260,7 @@ async fn partition_drop_does_not_emit_delete_or_truncate() {
266260
);
267261

268262
pipeline.start().await.unwrap();
269-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
270-
.await
271-
.unwrap();
263+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
272264

273265
let events_before = destination.get_events().await;
274266
let grouped_before = group_events_by_type_and_table_id(&events_before);
@@ -375,9 +367,7 @@ async fn partition_detach_with_explicit_publication_does_not_replicate_detached_
375367
);
376368

377369
pipeline.start().await.unwrap();
378-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
379-
.await
380-
.unwrap();
370+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
381371

382372
// Verify initial sync copied both rows.
383373
let table_rows = destination.get_table_rows().await;
@@ -424,9 +414,7 @@ async fn partition_detach_with_explicit_publication_does_not_replicate_detached_
424414
let inserts_notify = destination
425415
.wait_for_events_count(vec![(EventType::Insert, 1)])
426416
.await;
427-
tokio::time::timeout(Duration::from_secs(3), inserts_notify.notified())
428-
.await
429-
.unwrap();
417+
let _ = tokio::time::timeout(Duration::from_secs(3), inserts_notify.notified()).await;
430418

431419
tokio::time::timeout(Duration::from_secs(3), pipeline.shutdown_and_wait())
432420
.await
@@ -512,9 +500,7 @@ async fn partition_detach_with_all_tables_publication_does_not_replicate_detache
512500
);
513501

514502
pipeline.start().await.unwrap();
515-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
516-
.await
517-
.unwrap();
503+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
518504

519505
// Verify the initial state. The parent table is the only table tracked.
520506
let table_states_before = state_store.get_table_replication_states().await;
@@ -667,9 +653,7 @@ async fn partition_detach_with_all_tables_publication_does_replicate_detached_in
667653
);
668654

669655
pipeline.start().await.unwrap();
670-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
671-
.await
672-
.unwrap();
656+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
673657

674658
// Verify the initial state. The parent table is the only table tracked.
675659
let table_states_before = state_store.get_table_replication_states().await;
@@ -724,9 +708,7 @@ async fn partition_detach_with_all_tables_publication_does_replicate_detached_in
724708
pipeline.start().await.unwrap();
725709

726710
// Wait for the detached partition to be synced.
727-
tokio::time::timeout(Duration::from_secs(3), detached_sync_done.notified())
728-
.await
729-
.unwrap();
711+
let _ = tokio::time::timeout(Duration::from_secs(3), detached_sync_done.notified()).await;
730712

731713
tokio::time::timeout(Duration::from_secs(3), pipeline.shutdown_and_wait())
732714
.await
@@ -820,9 +802,7 @@ async fn partition_detach_with_schema_publication_does_not_replicate_detached_in
820802
);
821803

822804
pipeline.start().await.unwrap();
823-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
824-
.await
825-
.unwrap();
805+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
826806

827807
// Verify initial state.
828808
let table_states_before = state_store.get_table_replication_states().await;
@@ -886,9 +866,7 @@ async fn partition_detach_with_schema_publication_does_not_replicate_detached_in
886866
let inserts_notify = destination
887867
.wait_for_events_count(vec![(EventType::Insert, 1)])
888868
.await;
889-
tokio::time::timeout(Duration::from_secs(3), inserts_notify.notified())
890-
.await
891-
.unwrap();
869+
let _ = tokio::time::timeout(Duration::from_secs(3), inserts_notify.notified()).await;
892870

893871
tokio::time::timeout(Duration::from_secs(3), pipeline.shutdown_and_wait())
894872
.await
@@ -990,9 +968,7 @@ async fn partition_detach_with_schema_publication_does_replicate_detached_insert
990968
);
991969

992970
pipeline.start().await.unwrap();
993-
tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified())
994-
.await
995-
.unwrap();
971+
let _ = tokio::time::timeout(Duration::from_secs(3), parent_sync_done.notified()).await;
996972

997973
// Verify initial state.
998974
let table_states_before = state_store.get_table_replication_states().await;
@@ -1047,9 +1023,7 @@ async fn partition_detach_with_schema_publication_does_replicate_detached_insert
10471023
pipeline.start().await.unwrap();
10481024

10491025
// Wait for the detached partition to be synced.
1050-
tokio::time::timeout(Duration::from_secs(3), detached_sync_done.notified())
1051-
.await
1052-
.unwrap();
1026+
let _ = tokio::time::timeout(Duration::from_secs(3), detached_sync_done.notified()).await;
10531027

10541028
tokio::time::timeout(Duration::from_secs(3), pipeline.shutdown_and_wait())
10551029
.await

0 commit comments

Comments
 (0)