@@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned};
3030use crate :: difference:: Semigroup ;
3131use crate :: lattice:: Lattice ;
3232use crate :: trace:: { self , Trace , TraceReader , BatchReader , Batcher , Builder , Cursor } ;
33- use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine } ;
33+ use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine , VecChunker } ;
3434
3535use trace:: wrappers:: enter:: { TraceEnter , BatchEnter , } ;
3636use trace:: wrappers:: enter_at:: TraceEnter as TraceEnterAt ;
7676use :: timely:: dataflow:: scopes:: Child ;
7777use :: timely:: progress:: timestamp:: Refines ;
7878use timely:: Container ;
79- use timely:: container:: PushInto ;
79+ use timely:: container:: { ContainerBuilder , PushInto } ;
8080
8181impl < G , Tr > Arranged < G , Tr >
8282where
@@ -350,20 +350,22 @@ where
350350 G : Scope < Timestamp : Lattice > ,
351351{
352352 /// Arranges updates into a shared trace.
353- fn arrange < Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
353+ fn arrange < Chu , Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
354354 where
355- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
356- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
355+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
356+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
357+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
357358 Tr : Trace < Time =G :: Timestamp > + ' static ,
358359 {
359- self . arrange_named :: < Ba , Bu , Tr > ( "Arrange" )
360+ self . arrange_named :: < Chu , Ba , Bu , Tr > ( "Arrange" )
360361 }
361362
362363 /// Arranges updates into a shared trace, with a supplied name.
363- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
364+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
364365 where
365- Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
366- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
366+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
367+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
368+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
367369 Tr : Trace < Time =G :: Timestamp > + ' static ,
368370 ;
369371}
@@ -375,14 +377,15 @@ where
375377 V : ExchangeData ,
376378 R : ExchangeData + Semigroup ,
377379{
378- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
380+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
379381 where
380- Ba : Batcher < Input =Vec < ( ( K , V ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
381- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
382+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , V ) , G :: Timestamp , R ) > > ,
383+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
384+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
382385 Tr : Trace < Time =G :: Timestamp > + ' static ,
383386 {
384387 let exchange = Exchange :: new ( move |update : & ( ( K , V ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
385- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . inner , exchange, name)
388+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . inner , exchange, name)
386389 }
387390}
388391
@@ -391,12 +394,14 @@ where
391394/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
392395/// It uses the supplied parallelization contract to distribute the data, which does not need to
393396/// be consistently by key (though this is the most common).
394- pub fn arrange_core < G , P , Ba , Bu , Tr > ( stream : & StreamCore < G , Ba :: Input > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
397+ pub fn arrange_core < G , P , C , Chu , Ba , Bu , Tr > ( stream : & StreamCore < G , C > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
395398where
396399 G : Scope < Timestamp : Lattice > ,
397- P : ParallelizationContract < G :: Timestamp , Ba :: Input > ,
398- Ba : Batcher < Time =G :: Timestamp , Input : Container + Clone + ' static > + ' static ,
399- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
400+ P : ParallelizationContract < G :: Timestamp , C > ,
401+ C : Container + Clone + ' static ,
402+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut C > ,
403+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
404+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
400405 Tr : Trace < Time =G :: Timestamp > +' static ,
401406{
402407 // The `Arrange` operator is tasked with reacting to an advancing input
@@ -445,6 +450,8 @@ where
445450 // Initialize to the minimal input frontier.
446451 let mut prev_frontier = Antichain :: from_elem ( <G :: Timestamp as Timestamp >:: minimum ( ) ) ;
447452
453+ let mut chunker = Chu :: default ( ) ;
454+
448455 move |input, output| {
449456
450457 // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
@@ -453,7 +460,11 @@ where
453460
454461 input. for_each ( |cap, data| {
455462 capabilities. insert ( cap. retain ( ) ) ;
456- batcher. push_container ( data) ;
463+ chunker. push_into ( data) ;
464+ while let Some ( chunk) = chunker. extract ( ) {
465+ let chunk = std:: mem:: take ( chunk) ;
466+ batcher. push_into ( chunk) ;
467+ }
457468 } ) ;
458469
459470 // The frontier may have advanced by multiple elements, which is an issue because
@@ -483,6 +494,11 @@ where
483494 // If there is at least one capability not in advance of the input frontier ...
484495 if capabilities. elements ( ) . iter ( ) . any ( |c| !input. frontier ( ) . less_equal ( c. time ( ) ) ) {
485496
497+ while let Some ( chunk) = chunker. finish ( ) {
498+ let chunk = std:: mem:: take ( chunk) ;
499+ batcher. push_into ( chunk) ;
500+ }
501+
486502 let mut upper = Antichain :: new ( ) ; // re-used allocation for sealing batches.
487503
488504 // For each capability not in advance of the input frontier ...
@@ -549,14 +565,15 @@ impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K,
549565where
550566 G : Scope < Timestamp : Lattice +Ord > ,
551567{
552- fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
568+ fn arrange_named < Chu , Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
553569 where
554- Ba : Batcher < Input =Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
555- Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
570+ Chu : ContainerBuilder < Container =Ba :: Container > + for < ' a > PushInto < & ' a mut Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > > ,
571+ Ba : Batcher < Time =G :: Timestamp > + ' static ,
572+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Container , Output = Tr :: Batch > ,
556573 Tr : Trace < Time =G :: Timestamp > + ' static ,
557574 {
558575 let exchange = Exchange :: new ( move |update : & ( ( K , ( ) ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
559- arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
576+ arrange_core :: < _ , _ , _ , Chu , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
560577 }
561578}
562579
@@ -589,7 +606,7 @@ where
589606 }
590607
591608 fn arrange_by_key_named ( & self , name : & str ) -> Arranged < G , TraceAgent < ValSpine < K , V , G :: Timestamp , R > > > {
592- self . arrange_named :: < ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
609+ self . arrange_named :: < VecChunker < _ > , ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
593610 }
594611}
595612
@@ -624,6 +641,6 @@ where
624641
625642 fn arrange_by_self_named ( & self , name : & str ) -> Arranged < G , TraceAgent < KeySpine < K , G :: Timestamp , R > > > {
626643 self . map ( |k| ( k, ( ) ) )
627- . arrange_named :: < KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
644+ . arrange_named :: < VecChunker < _ > , KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
628645 }
629646}
0 commit comments