@@ -591,6 +591,12 @@ struct AggHashMapWithSerializedKey : public AggHashMapWithKey<HashMap, AggHashMa
591
591
using Iterator = typename HashMap::iterator;
592
592
using ResultVector = Buffer<Slice>;
593
593
594
+ struct CacheEntry {
595
+ KeyType key;
596
+ size_t hashval;
597
+ };
598
+ std::vector<CacheEntry> caches;
599
+
594
600
template <class ... Args>
595
601
AggHashMapWithSerializedKey (int chunk_size, Args&&... args)
596
602
: Base(chunk_size, std::forward<Args>(args)...),
@@ -686,7 +692,23 @@ struct AggHashMapWithSerializedKey : public AggHashMapWithKey<HashMap, AggHashMa
686
692
for (const auto & key_column : key_columns) {
687
693
key_column->serialize_batch (buffer, slice_sizes, chunk_size, max_one_row_size);
688
694
}
695
+ if (this ->hash_map .bucket_count () < prefetch_threhold) {
696
+ this ->template compute_agg_states_by_cols_non_prefetch <Func, allocate_and_compute_state,
697
+ compute_not_founds>(
698
+ chunk_size, key_columns, pool, std::move (allocate_func), agg_states, not_founds,
699
+ max_serialize_each_row);
700
+ } else {
701
+ this ->template compute_agg_states_by_cols_prefetch <Func, allocate_and_compute_state, compute_not_founds>(
702
+ chunk_size, key_columns, pool, std::move (allocate_func), agg_states, not_founds,
703
+ max_serialize_each_row);
704
+ }
705
+ }
689
706
707
+ template <typename Func, bool allocate_and_compute_state, bool compute_not_founds>
708
+ ALWAYS_NOINLINE void compute_agg_states_by_cols_non_prefetch (size_t chunk_size, const Columns& key_columns,
709
+ MemPool* pool, Func&& allocate_func,
710
+ Buffer<AggDataPtr>* agg_states, Filter* not_founds,
711
+ size_t max_serialize_each_row) {
690
712
for (size_t i = 0 ; i < chunk_size; ++i) {
691
713
Slice key = {buffer + i * max_one_row_size, slice_sizes[i]};
692
714
if constexpr (allocate_and_compute_state) {
@@ -714,6 +736,50 @@ struct AggHashMapWithSerializedKey : public AggHashMapWithKey<HashMap, AggHashMa
714
736
}
715
737
}
716
738
739
+ template <typename Func, bool allocate_and_compute_state, bool compute_not_founds>
740
+ ALWAYS_NOINLINE void compute_agg_states_by_cols_prefetch (size_t chunk_size, const Columns& key_columns,
741
+ MemPool* pool, Func&& allocate_func,
742
+ Buffer<AggDataPtr>* agg_states, Filter* not_founds,
743
+ size_t max_serialize_each_row) {
744
+ caches.resize (chunk_size);
745
+ for (size_t i = 0 ; i < chunk_size; ++i) {
746
+ caches[i].key = KeyType (Slice (buffer + i * max_one_row_size, slice_sizes[i]));
747
+ }
748
+ for (size_t i = 0 ; i < chunk_size; ++i) {
749
+ caches[i].hashval = this ->hash_map .hash_function ()(caches[i].key );
750
+ }
751
+
752
+ for (size_t i = 0 ; i < chunk_size; ++i) {
753
+ if (i + AGG_HASH_MAP_DEFAULT_PREFETCH_DIST < chunk_size) {
754
+ this ->hash_map .prefetch_hash (caches[i + AGG_HASH_MAP_DEFAULT_PREFETCH_DIST].hashval );
755
+ }
756
+
757
+ const auto & key = caches[i].key ;
758
+ if constexpr (allocate_and_compute_state) {
759
+ auto iter = this ->hash_map .lazy_emplace_with_hash (key, caches[i].hashval , [&](const auto & ctor) {
760
+ if constexpr (compute_not_founds) {
761
+ DCHECK (not_founds);
762
+ (*not_founds)[i] = 1 ;
763
+ }
764
+ // we must persist the slice before insert
765
+ uint8_t * pos = pool->allocate (key.size );
766
+ strings::memcpy_inlined (pos, key.data , key.size );
767
+ Slice pk{pos, key.size };
768
+ AggDataPtr pv = allocate_func (pk);
769
+ ctor (pk, pv);
770
+ });
771
+ (*agg_states)[i] = iter->second ;
772
+ } else if constexpr (compute_not_founds) {
773
+ DCHECK (not_founds);
774
+ if (auto iter = this ->hash_map .find (key, caches[i].hashval ); iter != this ->hash_map .end ()) {
775
+ (*agg_states)[i] = iter->second ;
776
+ } else {
777
+ (*not_founds)[i] = 1 ;
778
+ }
779
+ }
780
+ }
781
+ }
782
+
717
783
uint32_t get_max_serialize_size (const Columns& key_columns) {
718
784
uint32_t max_size = 0 ;
719
785
for (const auto & key_column : key_columns) {
0 commit comments