@@ -18,7 +18,6 @@ use std::time::{Duration, Instant};
18
18
use anyhow:: { anyhow, bail} ;
19
19
use differential_dataflow:: difference:: Semigroup ;
20
20
use differential_dataflow:: lattice:: Lattice ;
21
- use futures:: pin_mut;
22
21
use futures_util:: { StreamExt , TryStreamExt , stream} ;
23
22
use mz_dyncfg:: { Config , ConfigSet } ;
24
23
use mz_ore:: metrics:: MetricsRegistry ;
@@ -37,11 +36,10 @@ use crate::cache::StateCache;
37
36
use crate :: cfg:: { COMPACTION_MEMORY_BOUND_BYTES , all_dyncfgs} ;
38
37
use crate :: cli:: args:: { StateArgs , StoreArgs , make_blob, make_consensus} ;
39
38
use crate :: cli:: inspect:: FAKE_OPAQUE_CODEC ;
40
- use crate :: internal:: compact:: { CompactConfig , CompactReq , CompactRes , Compactor } ;
39
+ use crate :: internal:: compact:: { CompactConfig , CompactReq , Compactor } ;
41
40
use crate :: internal:: encoding:: Schemas ;
42
41
use crate :: internal:: gc:: { GarbageCollector , GcReq } ;
43
42
use crate :: internal:: machine:: Machine ;
44
- use crate :: internal:: state:: HollowBatch ;
45
43
use crate :: internal:: trace:: FueledMergeRes ;
46
44
use crate :: rpc:: { NoopPubSubSender , PubSubSender } ;
47
45
use crate :: write:: { WriteHandle , WriterId } ;
@@ -491,44 +489,16 @@ where
491
489
val : Arc :: clone ( & val_schema) ,
492
490
} ;
493
491
494
- let stream = Compactor :: < K , V , T , D > :: compact_stream (
492
+ let res = Compactor :: < K , V , T , D > :: compact (
495
493
CompactConfig :: new ( & cfg, shard_id) ,
496
494
Arc :: clone ( & blob) ,
497
495
Arc :: clone ( & metrics) ,
498
496
Arc :: clone ( & machine. applier . shard_metrics ) ,
499
497
Arc :: new ( IsolatedRuntime :: default ( ) ) ,
500
- req. clone ( ) ,
498
+ req,
501
499
schemas,
502
- ) ;
503
- pin_mut ! ( stream) ;
504
-
505
- let mut all_parts = vec ! [ ] ;
506
- let mut all_run_splits = vec ! [ ] ;
507
- let mut all_run_meta = vec ! [ ] ;
508
- let mut len = 0 ;
509
-
510
- while let Some ( res) = stream. next ( ) . await {
511
- let res = res?;
512
- let ( parts, updates, run_meta, run_splits) = (
513
- res. output . parts ,
514
- res. output . len ,
515
- res. output . run_meta ,
516
- res. output . run_splits ,
517
- ) ;
518
- let run_offset = all_parts. len ( ) ;
519
- if !all_parts. is_empty ( ) {
520
- all_run_splits. push ( run_offset) ;
521
- }
522
- all_run_splits. extend ( run_splits. iter ( ) . map ( |r| r + run_offset) ) ;
523
- all_run_meta. extend ( run_meta) ;
524
- all_parts. extend ( parts) ;
525
- len += updates;
526
- }
527
-
528
- let res = CompactRes {
529
- output : HollowBatch :: new ( req. desc , all_parts, len, all_run_meta, all_run_splits) ,
530
- } ;
531
-
500
+ )
501
+ . await ?;
532
502
metrics. compaction . admin_count . inc ( ) ;
533
503
info ! (
534
504
"attempt {} req {}: compacted into {} parts {} bytes in {:?}" ,
@@ -539,10 +509,7 @@ where
539
509
start. elapsed( ) ,
540
510
) ;
541
511
let ( apply_res, maintenance) = machine
542
- . merge_res ( & FueledMergeRes {
543
- output : res. output ,
544
- new_active_compaction : None ,
545
- } )
512
+ . merge_res ( & FueledMergeRes { output : res. output } )
546
513
. await ;
547
514
if !maintenance. is_empty ( ) {
548
515
info ! ( "ignoring non-empty requested maintenance: {maintenance:?}" )
@@ -794,7 +761,7 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
794
761
write. write_schemas . clone ( ) ,
795
762
)
796
763
. await ;
797
- let apply_maintenance = match res {
764
+ let ( res , apply_maintenance) = match res {
798
765
Ok ( x) => x,
799
766
Err ( err) => {
800
767
warn ! (
@@ -808,10 +775,11 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
808
775
} ;
809
776
machine. applier . metrics . compaction . admin_count . inc ( ) ;
810
777
info ! (
811
- "force_compaction {} {} compacted in {:?}" ,
778
+ "force_compaction {} {} compacted in {:?}: {:?} " ,
812
779
machine. applier. shard_metrics. name,
813
780
machine. applier. shard_metrics. shard_id,
814
- start. elapsed( )
781
+ start. elapsed( ) ,
782
+ res
815
783
) ;
816
784
maintenance. merge ( apply_maintenance) ;
817
785
}
0 commit comments