@@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
3030use crate :: difference:: Semigroup ;
3131use crate :: lattice:: Lattice ;
3232use crate :: trace:: { self , Trace , TraceReader , BatchReader , Batcher , Builder , Cursor } ;
33- use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine } ;
33+ use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine , VecChunker } ;
3434
3535use trace:: wrappers:: enter:: { TraceEnter , BatchEnter , } ;
3636use trace:: wrappers:: enter_at:: TraceEnter as TraceEnterAt ;
7676use :: timely:: dataflow:: scopes:: Child ;
7777use :: timely:: progress:: timestamp:: Refines ;
7878use timely:: Container ;
79- use timely:: container:: PushInto ;
79+ use timely:: container:: { ContainerBuilder , PushInto } ;
8080
8181impl < G , Tr > Arranged < G , Tr >
8282where
@@ -348,20 +348,22 @@ where
348348 G : Scope < Timestamp : Lattice > ,
349349{
350350 /// Arranges updates into a shared trace.
351- fn arrange < Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
351+ fn arrange < Chu , Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
352352 where
353- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
354- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
353+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
354+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
355+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
355356 Tr : Trace < Time =G :: Timestamp > + ' static ,
356357 {
357- self . arrange_named :: < Ba , Bu , Tr > ( "Arrange" )
358+ self . arrange_named :: < Chu , Ba , Bu , Tr > ( "Arrange" )
358359 }
359360
360361 /// Arranges updates into a shared trace, with a supplied name.
361- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
362+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
362363 where
363- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
364- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
364+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
365+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
366+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
365367 Tr : Trace < Time =G :: Timestamp > + ' static ,
366368 ;
367369}
@@ -373,14 +375,15 @@ where
373375 V : ExchangeData ,
374376 R : ExchangeData + Semigroup ,
375377{
376- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
378+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
377379 where
378- Ba : Batcher < Input =Vec < ( ( K , V ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
379- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
380+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , V ) , G :: Timestamp , R ) > > ,
381+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
382+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
380383 Tr : Trace < Time =G :: Timestamp > + ' static ,
381384 {
382385 let exchange = Exchange :: new ( move |update : & ( ( K , V ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
383- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . inner , exchange, name)
386+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . inner , exchange, name)
384387 }
385388}
386389
@@ -389,12 +392,14 @@ where
389392/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
390393/// It uses the supplied parallelization contract to distribute the data, which does not need to
391394/// be consistently by key (though this is the most common).
392- pub fn arrange_core < G , P , Ba , Bu , Tr > ( stream : & StreamCore < G , Ba :: Input > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
395+ pub fn arrange_core < G , P , C , Chu , Ba , Bu , Tr > ( stream : & StreamCore < G , C > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
393396where
394397 G : Scope < Timestamp : Lattice > ,
395- P : ParallelizationContract < G :: Timestamp , Ba :: Input > ,
396- Ba : Batcher < Time =G :: Timestamp , Input : Container + Clone + ' static > + ' static ,
397- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
398+ P : ParallelizationContract < G :: Timestamp , C > ,
399+ C : Container + Clone + ' static ,
400+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
401+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
402+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
398403 Tr : Trace < Time =G :: Timestamp > +' static ,
399404{
400405 // The `Arrange` operator is tasked with reacting to an advancing input
@@ -443,6 +448,8 @@ where
443448 // Initialize to the minimal input frontier.
444449 let mut prev_frontier = Antichain :: from_elem ( <G :: Timestamp as Timestamp >:: minimum ( ) ) ;
445450
451+ let mut chunker = Chu :: default ( ) ;
452+
446453 move |input, output| {
447454
448455 // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
@@ -451,7 +458,11 @@ where
451458
452459 input. for_each ( |cap, data| {
453460 capabilities. insert ( cap. retain ( ) ) ;
454- batcher. push_container ( data) ;
461+ chunker. push_into ( data) ;
462+ while let Some ( chunk) = chunker. extract ( ) {
463+ let chunk = std:: mem:: take ( chunk) ;
464+ batcher. push_into ( chunk) ;
465+ }
455466 } ) ;
456467
457468 // The frontier may have advanced by multiple elements, which is an issue because
@@ -481,6 +492,11 @@ where
481492 // If there is at least one capability not in advance of the input frontier ...
482493 if capabilities. elements ( ) . iter ( ) . any ( |c| !input. frontier ( ) . less_equal ( c. time ( ) ) ) {
483494
495+ while let Some ( chunk) = chunker. finish ( ) {
496+ let chunk = std:: mem:: take ( chunk) ;
497+ batcher. push_into ( chunk) ;
498+ }
499+
484500 let mut upper = Antichain :: new ( ) ; // re-used allocation for sealing batches.
485501
486502 // For each capability not in advance of the input frontier ...
@@ -547,14 +563,15 @@ impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K,
547563where
548564 G : Scope < Timestamp : Lattice +Ord > ,
549565{
550- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
566+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
551567 where
552- Ba : Batcher < Input =Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
553- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
568+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > > ,
569+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
570+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
554571 Tr : Trace < Time =G :: Timestamp > + ' static ,
555572 {
556573 let exchange = Exchange :: new ( move |update : & ( ( K , ( ) ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
557- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
574+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
558575 }
559576}
560577
@@ -587,7 +604,7 @@ where
587604 }
588605
589606 fn arrange_by_key_named ( & self , name : & str ) -> Arranged < G , TraceAgent < ValSpine < K , V , G :: Timestamp , R > > > {
590- self . arrange_named :: < ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
607+ self . arrange_named :: < VecChunker < _ > , ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
591608 }
592609}
593610
@@ -622,6 +639,6 @@ where
622639
623640 fn arrange_by_self_named ( & self , name : & str ) -> Arranged < G , TraceAgent < KeySpine < K , G :: Timestamp , R > > > {
624641 self . map ( |k| ( k, ( ) ) )
625- . arrange_named :: < KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
642+ . arrange_named :: < VecChunker < _ > , KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
626643 }
627644}
0 commit comments