Skip to content

Commit 06c48a8

Browse files
authored
test(storage): mitigate integration test flakiness (#3571)
Reduce the number of buckets created by the integration tests, and be a little more patient during bucket creation.
1 parent f86e2dd commit 06c48a8

File tree

2 files changed

+127
-136
lines changed

2 files changed

+127
-136
lines changed

src/integration-tests/src/storage.rs

Lines changed: 119 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -307,14 +307,38 @@ pub async fn objects_large_file(builder: storage::builder::storage::ClientBuilde
307307
Ok(())
308308
}
309309

310-
pub async fn upload_buffered(builder: storage::builder::storage::ClientBuilder) -> Result<()> {
311-
// Create a temporary bucket for the test.
312-
let (control, bucket) = create_test_bucket().await?;
310+
pub async fn uploads(
311+
builder: storage::builder::storage::ClientBuilder,
312+
bucket_name: &str,
313+
) -> Result<()> {
314+
// Run all the upload tests in parallel, using the same bucket.
315+
// Creating a new bucket is rate-limited, and slow. Creating objects
316+
// is relatively cheap.
313317
let client = builder.build().await?;
318+
let pending: Vec<std::pin::Pin<Box<dyn Future<Output = Result<()>>>>> = vec![
319+
Box::pin(upload_buffered(&client, bucket_name)),
320+
Box::pin(upload_buffered_resumable_known_size(&client, bucket_name)),
321+
Box::pin(upload_buffered_resumable_unknown_size(&client, bucket_name)),
322+
Box::pin(upload_unbuffered_resumable_known_size(&client, bucket_name)),
323+
Box::pin(upload_unbuffered_resumable_unknown_size(
324+
&client,
325+
bucket_name,
326+
)),
327+
Box::pin(abort_upload_unbuffered(&client, bucket_name)),
328+
Box::pin(abort_upload_buffered(&client, bucket_name)),
329+
];
330+
let result: Result<Vec<_>> = futures::future::join_all(pending.into_iter())
331+
.await
332+
.into_iter()
333+
.collect();
334+
let _ = result?;
335+
Ok(())
336+
}
314337

338+
async fn upload_buffered(client: &storage::client::Storage, bucket_name: &str) -> Result<()> {
315339
tracing::info!("testing upload_object_buffered() [1]");
316340
let insert = client
317-
.write_object(&bucket.name, "empty.txt", "")
341+
.write_object(bucket_name, "upload_buffered/empty.txt", "")
318342
.set_if_generation_match(0)
319343
.send_buffered()
320344
.await?;
@@ -324,7 +348,7 @@ pub async fn upload_buffered(builder: storage::builder::storage::ClientBuilder)
324348
let payload = bytes::Bytes::from_owner(Vec::from_iter((0..128 * 1024).map(|_| 0_u8)));
325349
tracing::info!("testing upload_object_buffered() [2]");
326350
let insert = client
327-
.write_object(&bucket.name, "128K.txt", payload)
351+
.write_object(bucket_name, "upload_buffered/128K.txt", payload)
328352
.set_if_generation_match(0)
329353
.send_buffered()
330354
.await?;
@@ -334,29 +358,28 @@ pub async fn upload_buffered(builder: storage::builder::storage::ClientBuilder)
334358
let payload = bytes::Bytes::from_owner(Vec::from_iter((0..512 * 1024).map(|_| 0_u8)));
335359
tracing::info!("testing upload_object_buffered() [3]");
336360
let insert = client
337-
.write_object(&bucket.name, "512K.txt", payload)
361+
.write_object(bucket_name, "upload_buffered/512K.txt", payload)
338362
.set_if_generation_match(0)
339363
.send_buffered()
340364
.await?;
341365
tracing::info!("success with insert={insert:?}");
342366
assert_eq!(insert.size, 512 * 1024_i64);
343367

344-
cleanup_bucket(control, bucket.name).await?;
345-
346368
Ok(())
347369
}
348370

349-
pub async fn upload_buffered_resumable_known_size(
350-
builder: storage::builder::storage::ClientBuilder,
371+
async fn upload_buffered_resumable_known_size(
372+
client: &storage::client::Storage,
373+
bucket_name: &str,
351374
) -> Result<()> {
352-
// Create a temporary bucket for the test.
353-
let (control, bucket) = create_test_bucket().await?;
354-
let client = builder.build().await?;
355-
356375
tracing::info!("testing send_unbuffered() [1]");
357376
let payload = TestDataSource::new(0_u64);
358377
let insert = client
359-
.write_object(&bucket.name, "empty.txt", payload)
378+
.write_object(
379+
bucket_name,
380+
"upload_buffered_resumable_known_size/empty.txt",
381+
payload,
382+
)
360383
.set_if_generation_match(0)
361384
.with_resumable_upload_threshold(0_usize)
362385
.send_buffered()
@@ -367,7 +390,11 @@ pub async fn upload_buffered_resumable_known_size(
367390
let payload = TestDataSource::new(128 * 1024_u64);
368391
tracing::info!("testing upload_object_buffered() [2]");
369392
let insert = client
370-
.write_object(&bucket.name, "128K.txt", payload)
393+
.write_object(
394+
bucket_name,
395+
"upload_buffered_resumable_known_size/128K.txt",
396+
payload,
397+
)
371398
.set_if_generation_match(0)
372399
.with_resumable_upload_threshold(0_usize)
373400
.send_buffered()
@@ -378,30 +405,33 @@ pub async fn upload_buffered_resumable_known_size(
378405
let payload = TestDataSource::new(512 * 1024_u64);
379406
tracing::info!("testing upload_object_buffered() [3]");
380407
let insert = client
381-
.write_object(&bucket.name, "512K.txt", payload)
408+
.write_object(
409+
bucket_name,
410+
"upload_buffered_resumable_known_size/512K.txt",
411+
payload,
412+
)
382413
.set_if_generation_match(0)
383414
.with_resumable_upload_threshold(0_usize)
384415
.send_buffered()
385416
.await?;
386417
tracing::info!("success with insert={insert:?}");
387418
assert_eq!(insert.size, 512 * 1024_i64);
388419

389-
cleanup_bucket(control, bucket.name).await?;
390-
391420
Ok(())
392421
}
393422

394-
pub async fn upload_buffered_resumable_unknown_size(
395-
builder: storage::builder::storage::ClientBuilder,
423+
async fn upload_buffered_resumable_unknown_size(
424+
client: &storage::client::Storage,
425+
bucket_name: &str,
396426
) -> Result<()> {
397-
// Create a temporary bucket for the test.
398-
let (control, bucket) = create_test_bucket().await?;
399-
let client = builder.build().await?;
400-
401427
tracing::info!("testing send_unbuffered() [1]");
402428
let payload = TestDataSource::new(0_u64).without_size_hint();
403429
let insert = client
404-
.write_object(&bucket.name, "empty.txt", payload)
430+
.write_object(
431+
bucket_name,
432+
"upload_buffered_resumable_unknown_size/empty.txt",
433+
payload,
434+
)
405435
.set_if_generation_match(0)
406436
.with_resumable_upload_threshold(0_usize)
407437
.send_buffered()
@@ -412,7 +442,11 @@ pub async fn upload_buffered_resumable_unknown_size(
412442
let payload = TestDataSource::new(128 * 1024_u64).without_size_hint();
413443
tracing::info!("testing upload_object_buffered() [2]");
414444
let insert = client
415-
.write_object(&bucket.name, "128K.txt", payload)
445+
.write_object(
446+
bucket_name,
447+
"upload_buffered_resumable_unknown_size/128K.txt",
448+
payload,
449+
)
416450
.set_if_generation_match(0)
417451
.with_resumable_upload_threshold(0_usize)
418452
.send_buffered()
@@ -423,7 +457,11 @@ pub async fn upload_buffered_resumable_unknown_size(
423457
let payload = TestDataSource::new(512 * 1024_u64).without_size_hint();
424458
tracing::info!("testing upload_object_buffered() [3]");
425459
let insert = client
426-
.write_object(&bucket.name, "512K.txt", payload)
460+
.write_object(
461+
bucket_name,
462+
"upload_buffered_resumable_unknown_size/512K.txt",
463+
payload,
464+
)
427465
.set_if_generation_match(0)
428466
.with_resumable_upload_threshold(0_usize)
429467
.send_buffered()
@@ -434,30 +472,33 @@ pub async fn upload_buffered_resumable_unknown_size(
434472
let payload = TestDataSource::new(500 * 1024_u64).without_size_hint();
435473
tracing::info!("testing upload_object_buffered() [4]");
436474
let insert = client
437-
.write_object(&bucket.name, "500K.txt", payload)
475+
.write_object(
476+
bucket_name,
477+
"upload_buffered_resumable_unknown_size/500K.txt",
478+
payload,
479+
)
438480
.set_if_generation_match(0)
439481
.with_resumable_upload_threshold(0_usize)
440482
.send_buffered()
441483
.await?;
442484
tracing::info!("success with insert={insert:?}");
443485
assert_eq!(insert.size, 500 * 1024_i64);
444486

445-
cleanup_bucket(control, bucket.name).await?;
446-
447487
Ok(())
448488
}
449489

450-
pub async fn upload_unbuffered_resumable_known_size(
451-
builder: storage::builder::storage::ClientBuilder,
490+
async fn upload_unbuffered_resumable_known_size(
491+
client: &storage::client::Storage,
492+
bucket_name: &str,
452493
) -> Result<()> {
453-
// Create a temporary bucket for the test.
454-
let (control, bucket) = create_test_bucket().await?;
455-
let client = builder.build().await?;
456-
457494
tracing::info!("testing send_unbuffered() [1]");
458495
let payload = TestDataSource::new(0_u64);
459496
let insert = client
460-
.write_object(&bucket.name, "empty.txt", payload)
497+
.write_object(
498+
bucket_name,
499+
"upload_unbuffered_resumable_known_size/empty.txt",
500+
payload,
501+
)
461502
.set_if_generation_match(0)
462503
.with_resumable_upload_threshold(0_usize)
463504
.send_unbuffered()
@@ -468,7 +509,11 @@ pub async fn upload_unbuffered_resumable_known_size(
468509
let payload = TestDataSource::new(128 * 1024_u64);
469510
tracing::info!("testing upload_object_buffered() [2]");
470511
let insert = client
471-
.write_object(&bucket.name, "128K.txt", payload)
512+
.write_object(
513+
bucket_name,
514+
"upload_unbuffered_resumable_known_size/128K.txt",
515+
payload,
516+
)
472517
.set_if_generation_match(0)
473518
.with_resumable_upload_threshold(0_usize)
474519
.send_unbuffered()
@@ -479,30 +524,33 @@ pub async fn upload_unbuffered_resumable_known_size(
479524
let payload = TestDataSource::new(512 * 1024_u64);
480525
tracing::info!("testing upload_object_buffered() [3]");
481526
let insert = client
482-
.write_object(&bucket.name, "512K.txt", payload)
527+
.write_object(
528+
bucket_name,
529+
"upload_unbuffered_resumable_known_size/512K.txt",
530+
payload,
531+
)
483532
.set_if_generation_match(0)
484533
.with_resumable_upload_threshold(0_usize)
485534
.send_unbuffered()
486535
.await?;
487536
tracing::info!("success with insert={insert:?}");
488537
assert_eq!(insert.size, 512 * 1024_i64);
489538

490-
cleanup_bucket(control, bucket.name).await?;
491-
492539
Ok(())
493540
}
494541

495-
pub async fn upload_unbuffered_resumable_unknown_size(
496-
builder: storage::builder::storage::ClientBuilder,
542+
async fn upload_unbuffered_resumable_unknown_size(
543+
client: &storage::client::Storage,
544+
bucket_name: &str,
497545
) -> Result<()> {
498-
// Create a temporary bucket for the test.
499-
let (control, bucket) = create_test_bucket().await?;
500-
let client = builder.build().await?;
501-
502546
tracing::info!("testing send_unbuffered() [1]");
503547
let payload = TestDataSource::new(0_u64).without_size_hint();
504548
let insert = client
505-
.write_object(&bucket.name, "empty.txt", payload)
549+
.write_object(
550+
bucket_name,
551+
"upload_unbuffered_resumable_unknown_size/empty.txt",
552+
payload,
553+
)
506554
.set_if_generation_match(0)
507555
.with_resumable_upload_threshold(0_usize)
508556
.send_unbuffered()
@@ -513,7 +561,11 @@ pub async fn upload_unbuffered_resumable_unknown_size(
513561
let payload = TestDataSource::new(128 * 1024_u64).without_size_hint();
514562
tracing::info!("testing upload_object_buffered() [2]");
515563
let insert = client
516-
.write_object(&bucket.name, "128K.txt", payload)
564+
.write_object(
565+
bucket_name,
566+
"upload_unbuffered_resumable_unknown_size/128K.txt",
567+
payload,
568+
)
517569
.set_if_generation_match(0)
518570
.with_resumable_upload_threshold(0_usize)
519571
.send_unbuffered()
@@ -524,7 +576,11 @@ pub async fn upload_unbuffered_resumable_unknown_size(
524576
let payload = TestDataSource::new(512 * 1024_u64).without_size_hint();
525577
tracing::info!("testing upload_object_buffered() [3]");
526578
let insert = client
527-
.write_object(&bucket.name, "512K.txt", payload)
579+
.write_object(
580+
bucket_name,
581+
"upload_unbuffered_resumable_unknown_size/512K.txt",
582+
payload,
583+
)
528584
.set_if_generation_match(0)
529585
.with_resumable_upload_threshold(0_usize)
530586
.send_unbuffered()
@@ -535,36 +591,24 @@ pub async fn upload_unbuffered_resumable_unknown_size(
535591
let payload = TestDataSource::new(500 * 1024_u64).without_size_hint();
536592
tracing::info!("testing upload_object_buffered() [4]");
537593
let insert = client
538-
.write_object(&bucket.name, "500K.txt", payload)
594+
.write_object(
595+
bucket_name,
596+
"upload_unbuffered_resumable_unknown_size/500K.txt",
597+
payload,
598+
)
539599
.set_if_generation_match(0)
540600
.with_resumable_upload_threshold(0_usize)
541601
.send_unbuffered()
542602
.await?;
543603
tracing::info!("success with insert={insert:?}");
544604
assert_eq!(insert.size, 500 * 1024_i64);
545605

546-
cleanup_bucket(control, bucket.name).await?;
547-
548606
Ok(())
549607
}
550608

551609
const ABORT_TEST_STOP: u64 = 512 * 1024;
552610
const ABORT_TEST_SIZE: u64 = 1024 * 1024;
553611

554-
pub async fn abort_upload(
555-
builder: storage::builder::storage::ClientBuilder,
556-
bucket_name: &str,
557-
) -> Result<()> {
558-
tracing::info!("abort_upload test, using bucket {}", bucket_name);
559-
560-
// Create a temporary bucket for the test.
561-
let client = builder.build().await?;
562-
563-
abort_upload_unbuffered(client.clone(), bucket_name).await?;
564-
abort_upload_buffered(client.clone(), bucket_name).await?;
565-
Ok(())
566-
}
567-
568612
struct AbortUploadTestCase {
569613
name: String,
570614
upload: storage::builder::storage::WriteObject<TestDataSource>,
@@ -594,7 +638,7 @@ fn abort_upload_test_cases(
594638
let mut uploads = Vec::new();
595639
for s in sources.into_iter() {
596640
for t in thresholds {
597-
let name = format!("{prefix}-{}-{}.txt", s.0, t.0);
641+
let name = format!("abort-upload/{prefix}/{}-{}.txt", s.0, t.0);
598642
let upload = client
599643
.write_object(bucket_name, &name, s.1.clone())
600644
.set_if_generation_match(0)
@@ -606,10 +650,10 @@ fn abort_upload_test_cases(
606650
}
607651

608652
async fn abort_upload_unbuffered(
609-
client: storage::client::Storage,
653+
client: &storage::client::Storage,
610654
bucket_name: &str,
611655
) -> Result<()> {
612-
let test_cases = abort_upload_test_cases(&client, bucket_name, "unbuffered");
656+
let test_cases = abort_upload_test_cases(client, bucket_name, "unbuffered");
613657

614658
for (number, AbortUploadTestCase { name, upload }) in test_cases.into_iter().enumerate() {
615659
tracing::info!("[{number}] {name}");
@@ -632,8 +676,8 @@ async fn abort_upload_unbuffered(
632676
Ok(())
633677
}
634678

635-
async fn abort_upload_buffered(client: storage::client::Storage, bucket_name: &str) -> Result<()> {
636-
let test_cases = abort_upload_test_cases(&client, bucket_name, "buffered");
679+
async fn abort_upload_buffered(client: &storage::client::Storage, bucket_name: &str) -> Result<()> {
680+
let test_cases = abort_upload_test_cases(client, bucket_name, "buffered");
637681

638682
for (number, AbortUploadTestCase { name, upload }) in test_cases.into_iter().enumerate() {
639683
tracing::info!("[{number}] {name}");
@@ -896,8 +940,8 @@ pub async fn create_test_bucket() -> Result<(StorageControl, Bucket)> {
896940
)
897941
.with_retry_policy(
898942
gax::retry_policy::AlwaysRetry
899-
.with_attempt_limit(5)
900-
.with_time_limit(Duration::from_secs(16)),
943+
.with_attempt_limit(16)
944+
.with_time_limit(Duration::from_secs(32)),
901945
)
902946
.build()
903947
.await?;

0 commit comments

Comments
 (0)