27
27
#include " runtime/runtime_state.h"
28
28
#include " types/logical_type_infra.h"
29
29
#include " util/orlp/pdqsort.h"
30
+ #include " util/runtime_profile.h"
30
31
#include " util/stopwatch.hpp"
31
32
32
33
namespace starrocks {
@@ -68,6 +69,8 @@ ChunksSorterTopn::ChunksSorterTopn(RuntimeState* state, const std::vector<ExprCo
68
69
_topn_type(topn_type) {
69
70
DCHECK_GT (_get_number_of_rows_to_sort (), 0 ) << " output rows can't be empty" ;
70
71
DCHECK (_topn_type == TTopNType::ROW_NUMBER || _offset == 0 );
72
+ _init_buffered_chunks = tunning_buffered_chunks (_get_number_of_rows_to_sort ());
73
+ _buffered_chunks_capacity = _init_buffered_chunks;
71
74
auto & raw_chunks = _raw_chunks.chunks ;
72
75
// avoid too large buffer chunks
73
76
raw_chunks.reserve (std::min<size_t >(max_buffered_chunks, 256 ));
@@ -88,43 +91,35 @@ Status ChunksSorterTopn::update(RuntimeState* state, const ChunkPtr& chunk) {
88
91
}
89
92
auto & raw_chunks = _raw_chunks.chunks ;
90
93
size_t chunk_number = raw_chunks.size ();
94
+ size_t prev_chunk_memusage = 0 ;
91
95
if (chunk_number <= 0 ) {
92
96
raw_chunks.push_back (chunk);
93
97
chunk_number++;
94
98
} else if (raw_chunks[chunk_number - 1 ]->num_rows () + chunk->num_rows () > _state->chunk_size ()) {
95
99
raw_chunks.push_back (chunk);
96
100
chunk_number++;
97
101
} else {
102
+ prev_chunk_memusage = raw_chunks[chunk_number - 1 ]->memory_usage ();
98
103
// Old planner will not remove duplicated sort column.
99
104
// columns in chunk may have same column ptr
100
105
// append_safe will check size of all columns in dest chunk
101
106
// to ensure same column will not apppend repeatedly.
102
107
raw_chunks[chunk_number - 1 ]->append_safe (*chunk);
103
108
}
109
+ _raw_chunks.update_mem_usage (raw_chunks[chunk_number - 1 ]->memory_usage () - prev_chunk_memusage);
104
110
_raw_chunks.size_of_rows += chunk->num_rows ();
105
111
106
112
// Avoid TOPN from using too much memory.
107
- bool exceed_mem_limit = _raw_chunks.mem_usage () > _max_buffered_bytes;
113
+ bool exceed_mem_limit = _raw_chunks.mem_usage > _max_buffered_bytes;
108
114
if (exceed_mem_limit) {
109
115
return _sort_chunks (state);
110
116
}
111
117
112
- // Try to accumulate more chunks.
113
- size_t rows_to_sort = _get_number_of_rows_to_sort ();
114
- if (_merged_runs.num_rows () + _raw_chunks.size_of_rows < rows_to_sort) {
115
- return Status::OK ();
116
- }
117
-
118
- // We have accumulated rows_to_sort rows to build merged runs.
119
- if (_merged_runs.num_rows () <= rows_to_sort) {
120
- return _sort_chunks (state);
121
- }
122
-
123
- // When number of Chunks exceeds _limit or _max_buffered_chunks, run sort and then part of
118
+ // When number of Chunks exceeds _buffered_chunks_capacity or rows greater than _max_buffered_rows , run sort and then part of
124
119
// cached chunks can be dropped, so it can reduce the memory usage.
125
120
// TopN caches _limit or _max_buffered_chunks primitive chunks,
126
121
// performs sorting once, and discards extra rows
127
- if (chunk_number >= _max_buffered_chunks || _raw_chunks.size_of_rows > _max_buffered_rows) {
122
+ if (chunk_number >= _buffered_chunks_capacity || _raw_chunks.size_of_rows > _max_buffered_rows) {
128
123
return _sort_chunks (state);
129
124
}
130
125
@@ -228,6 +223,9 @@ size_t ChunksSorterTopn::get_output_rows() const {
228
223
}
229
224
230
225
Status ChunksSorterTopn::_sort_chunks (RuntimeState* state) {
226
+ if (_sort_cnt) {
227
+ COUNTER_UPDATE (_sort_cnt, 1 );
228
+ }
231
229
// Chunks for this batch.
232
230
DataSegments segments;
233
231
@@ -569,6 +567,7 @@ Status ChunksSorterTopn::_merge_sort_common(MergedRuns* dst, DataSegments& segme
569
567
}
570
568
571
569
if (_merged_runs.num_chunks () > 1 || _merged_runs.mem_usage () > _max_buffered_bytes) {
570
+ _adjust_chunks_capacity (true );
572
571
// merge to multi sorted chunks
573
572
RETURN_IF_ERROR (merge_sorted_chunks (_sort_desc, _sort_exprs, _merged_runs, std::move (right_unique_chunk),
574
573
rows_to_keep, dst));
@@ -583,24 +582,31 @@ Status ChunksSorterTopn::_merge_sort_common(MergedRuns* dst, DataSegments& segme
583
582
// prepare right chunk
584
583
ChunkPtr right_chunk = std::move (right_unique_chunk);
585
584
586
- Permutation merged_perm;
587
- merged_perm.reserve (left_chunk->num_rows () + right_chunk->num_rows ());
588
-
589
- RETURN_IF_ERROR (merge_sorted_chunks_two_way (_sort_desc, {left_chunk, left_columns},
590
- {right_chunk, right_columns}, &merged_perm));
591
- CHECK_GE (merged_perm.size (), rows_to_keep);
592
- merged_perm.resize (rows_to_keep);
585
+ const SortedRun left = {left_chunk, left_columns};
586
+ const SortedRun right = {right_chunk, right_columns};
587
+ bool intersected = !left.empty () && !right.empty () && !left.intersect (_sort_desc, right);
588
+ // adjust chunks capacity
589
+ _adjust_chunks_capacity (intersected);
593
590
594
- // materialize into the dst runs
595
- std::vector<ChunkPtr> chunks{left_chunk, right_chunk};
596
591
ChunkUniquePtr big_chunk;
597
592
if (dst->num_chunks () == 0 ) {
598
593
big_chunk = segments[permutation_second[0 ].chunk_index ].chunk ->clone_empty (rows_to_keep);
599
594
} else {
600
595
big_chunk = std::move (dst->front ().chunk );
601
596
dst->pop_front ();
602
597
}
598
+
599
+ Permutation merged_perm;
600
+ merged_perm.reserve (left_chunk->num_rows () + right_chunk->num_rows ());
601
+ RETURN_IF_ERROR (merge_sorted_chunks_two_way (_sort_desc, left, right, &merged_perm));
602
+ CHECK_GE (merged_perm.size (), rows_to_keep);
603
+ merged_perm.resize (rows_to_keep);
604
+
605
+ // materialize into the dst runs
606
+ std::vector<ChunkPtr> chunks{left_chunk, right_chunk};
607
+
603
608
materialize_by_permutation (big_chunk.get (), chunks, merged_perm);
609
+
604
610
RETURN_IF_ERROR (big_chunk->upgrade_if_overflow ());
605
611
ASSIGN_OR_RETURN (auto run, MergedRun::build (std::move (big_chunk), *_sort_exprs));
606
612
dst->push_back (std::move (run));
0 commit comments