Skip to content

Commit 659da48

Browse files
authored
Merge pull request ClickHouse#78350 from shankar-iyer/additional_primary_key_scan_for_final_with_skip_index
Additional primary key scan after using skip index for query with FINAL clause.
2 parents 734218b + 6b0d003 commit 659da48

17 files changed

+376
-15
lines changed

src/Core/Settings.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,16 @@ By default, this setting is disabled because skip indexes may exclude rows (gran
14101410
14111411
Possible values:
14121412
1413+
- 0 — Disabled.
1414+
- 1 — Enabled.
1415+
)", 0) \
1416+
DECLARE(Bool, use_skip_indexes_if_final_exact_mode, 0, R"(
1417+
Controls whether granules returned by a skipping index are expanded in newer parts to return correct results when executing a query with the FINAL modifier.
1418+
1419+
Using skip indexes may exclude rows (granules) containing the latest data which could lead to incorrect results. This setting can ensure that correct results are returned by scanning newer parts that have overlap with the ranges returned by the skip index.
1420+
1421+
Possible values:
1422+
14131423
- 0 — Disabled.
14141424
- 1 — Enabled.
14151425
)", 0) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7171
{"geotoh3_lon_lat_input_order", true, false, "A new setting for legacy behaviour to set lon and lat order"},
7272
{"secondary_indices_enable_bulk_filtering", false, true, "A new algorithm for filtering by data skipping indices"},
7373
{"implicit_table_at_top_level", "", "", "A new setting, used in clickhouse-local"},
74+
{"use_skip_indexes_if_final_exact_mode", 0, 0, "This setting was introduced to help FINAL query return correct results with skip indexes"},
7475
});
7576
addSettingsChanges(settings_changes_history, "25.4",
7677
{

src/Processors/QueryPlan/PartsSplitter.cpp

Lines changed: 155 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,10 @@ class RangesInDataPartsBuilder
256256
return;
257257
}
258258

259-
ranges_in_data_parts[it->second].ranges.push_back(mark_range);
259+
if (ranges_in_data_parts[it->second].ranges.back().end == mark_range.begin)
260+
ranges_in_data_parts[it->second].ranges.back().end = mark_range.end;
261+
else
262+
ranges_in_data_parts[it->second].ranges.push_back(mark_range);
260263
}
261264

262265
RangesInDataParts & getCurrentRangesInDataParts()
@@ -289,6 +292,10 @@ struct PartsRangesIterator
289292

290293
if (event == other.event)
291294
{
295+
if (!selected && other.selected)
296+
return true;
297+
if (selected && !other.selected)
298+
return false;
292299
if (part_index == other.part_index)
293300
{
294301
/// Within the same part we should process events in order of mark numbers,
@@ -347,6 +354,7 @@ struct PartsRangesIterator
347354
MarkRange range;
348355
size_t part_index;
349356
EventType event;
357+
bool selected; /// Whether this range was selected or rejected in skip index filtering
350358
};
351359

352360
struct PartRangeIndex
@@ -475,7 +483,8 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
475483
in_reverse_order,
476484
range,
477485
part_index,
478-
PartsRangesIterator::EventType::RangeStart});
486+
PartsRangesIterator::EventType::RangeStart,
487+
false});
479488

480489
const bool value_is_defined_at_end_mark = range.end < index_granularity->getMarksCount();
481490
if (!value_is_defined_at_end_mark)
@@ -486,7 +495,8 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
486495
in_reverse_order,
487496
range,
488497
part_index,
489-
PartsRangesIterator::EventType::RangeEnd});
498+
PartsRangesIterator::EventType::RangeEnd,
499+
false});
490500
}
491501
}
492502

@@ -713,7 +723,8 @@ SplitPartsByRanges splitIntersectingPartsRangesIntoLayers(
713723
in_reverse_order,
714724
range,
715725
part_index,
716-
PartsRangesIterator::EventType::RangeStart};
726+
PartsRangesIterator::EventType::RangeStart,
727+
false};
717728
PartRangeIndex parts_range_start_index(parts_range_start);
718729
parts_ranges_queue.push({std::move(parts_range_start), std::move(parts_range_start_index)});
719730

@@ -726,7 +737,8 @@ SplitPartsByRanges splitIntersectingPartsRangesIntoLayers(
726737
in_reverse_order,
727738
range,
728739
part_index,
729-
PartsRangesIterator::EventType::RangeEnd};
740+
PartsRangesIterator::EventType::RangeEnd,
741+
false};
730742
PartRangeIndex parts_range_end_index(parts_range_end);
731743
parts_ranges_queue.push({std::move(parts_range_end), std::move(parts_range_end_index)});
732744
}
@@ -911,6 +923,130 @@ static ASTs buildFilters(const KeyDescription & primary_key, const std::vector<V
911923
return filters;
912924
}
913925

926+
RangesInDataParts findPKRangesForFinalAfterSkipIndexImpl(RangesInDataParts & ranges_in_data_parts, bool cannot_sort_primary_key, const LoggerPtr & logger)
927+
{
928+
IndexAccess index_access(ranges_in_data_parts);
929+
std::vector<PartsRangesIterator> selected_ranges;
930+
std::vector<PartsRangesIterator> rejected_ranges;
931+
932+
RangesInDataPartsBuilder result(ranges_in_data_parts);
933+
934+
auto skip_and_return_all_part_ranges = [&]()
935+
{
936+
RangesInDataParts all_part_ranges(std::move(ranges_in_data_parts));
937+
for (auto & all_part_range : all_part_ranges)
938+
{
939+
const auto & index_granularity = all_part_range.data_part->index_granularity;
940+
all_part_range.ranges = MarkRanges{{MarkRange{0, index_granularity->getMarksCountWithoutFinal()}}};
941+
}
942+
return all_part_ranges;
943+
};
944+
945+
if (cannot_sort_primary_key) /// just expand to all parts + ranges
946+
{
947+
return skip_and_return_all_part_ranges();
948+
}
949+
950+
for (size_t part_index = 0; part_index < ranges_in_data_parts.size(); ++part_index)
951+
{
952+
const auto & index_granularity = ranges_in_data_parts[part_index].data_part->index_granularity;
953+
std::vector<bool> is_selected_range(index_granularity->getMarksCountWithoutFinal(), false);
954+
for (const auto & range : ranges_in_data_parts[part_index].ranges)
955+
{
956+
const bool value_is_defined_at_end_mark = range.end < index_granularity->getMarksCount();
957+
if (!value_is_defined_at_end_mark)
958+
{
959+
return skip_and_return_all_part_ranges();
960+
}
961+
962+
selected_ranges.push_back(
963+
{index_access.getValue(part_index, range.begin), false, range, part_index, PartsRangesIterator::EventType::RangeStart, true});
964+
for (auto i = range.begin; i < range.end;i++)
965+
is_selected_range[i] = true;
966+
}
967+
968+
for (size_t range_begin = 0; range_begin < is_selected_range.size(); range_begin++)
969+
{
970+
const bool value_is_defined_at_end_mark = ((range_begin + 1) < index_granularity->getMarksCount());
971+
if (!value_is_defined_at_end_mark)
972+
{
973+
return skip_and_return_all_part_ranges();
974+
}
975+
976+
if (is_selected_range[range_begin])
977+
continue;
978+
MarkRange rejected_range(range_begin, range_begin + 1);
979+
rejected_ranges.push_back(
980+
{index_access.getValue(part_index, rejected_range.begin), false, rejected_range, part_index, PartsRangesIterator::EventType::RangeStart, false});
981+
}
982+
}
983+
984+
::sort(selected_ranges.begin(), selected_ranges.end());
985+
986+
::sort(rejected_ranges.begin(), rejected_ranges.end());
987+
988+
LOG_TRACE(logger, "findPKRangesForFinalAfterSkipIndex : sorting phase complete");
989+
990+
std::vector<PartsRangesIterator>::iterator selected_ranges_iter = selected_ranges.begin();
991+
std::vector<PartsRangesIterator>::iterator rejected_ranges_iter = rejected_ranges.begin();
992+
size_t more_ranges_added = 0;
993+
994+
while (selected_ranges_iter != selected_ranges.end() && rejected_ranges_iter != rejected_ranges.end())
995+
{
996+
auto selected_range_start = selected_ranges_iter->value;
997+
auto selected_range_end = index_access.getValue(selected_ranges_iter->part_index, selected_ranges_iter->range.end);
998+
auto rejected_range_start = rejected_ranges_iter->value;
999+
1000+
int result1 = compareValues(rejected_range_start, selected_range_start, false);
1001+
int result2 = compareValues(rejected_range_start, selected_range_end, false);
1002+
1003+
if (result1 == 0 || result2 == 0 || (result1 > 0 && result2 < 0)) /// rejected_range_start inside [selected_range]
1004+
{
1005+
result.addRange(rejected_ranges_iter->part_index, rejected_ranges_iter->range);
1006+
rejected_ranges_iter++;
1007+
more_ranges_added++;
1008+
}
1009+
else if (result1 > 0) /// rejected_range_start beyond [selected_range]
1010+
{
1011+
result.addRange(selected_ranges_iter->part_index, selected_ranges_iter->range);
1012+
selected_ranges_iter++;
1013+
}
1014+
else
1015+
{
1016+
auto rejected_range_end = index_access.getValue(rejected_ranges_iter->part_index, rejected_ranges_iter->range.end);
1017+
int result3 = compareValues(rejected_range_end, selected_range_start, false);
1018+
int result4 = compareValues(rejected_range_end, selected_range_end, false);
1019+
/// rejected_range_end inside [selected range] OR [rejected range] encompasses [selected range]
1020+
if (result3 == 0 || result4 == 0 || (result3 > 0 && result4 < 0) || (result1 < 0 && result4 > 0))
1021+
{
1022+
result.addRange(rejected_ranges_iter->part_index, rejected_ranges_iter->range);
1023+
more_ranges_added++;
1024+
}
1025+
rejected_ranges_iter++;
1026+
}
1027+
}
1028+
1029+
while (selected_ranges_iter != selected_ranges.end())
1030+
{
1031+
result.addRange(selected_ranges_iter->part_index, selected_ranges_iter->range);
1032+
selected_ranges_iter++;
1033+
}
1034+
1035+
auto result_final_ranges = result.getCurrentRangesInDataParts();
1036+
std::stable_sort(
1037+
result_final_ranges.begin(),
1038+
result_final_ranges.end(),
1039+
[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
1040+
for (auto & result_final_range : result_final_ranges)
1041+
{
1042+
std::sort(result_final_range.ranges.begin(), result_final_range.ranges.end());
1043+
}
1044+
1045+
LOG_TRACE(logger, "findPKRangesForFinalAfterSkipIndex : processed {} parts, initially selected {} ranges & rejected {}, more {} ranges added", ranges_in_data_parts.size(), selected_ranges.size(), rejected_ranges.size(), more_ranges_added);
1046+
1047+
return result_final_ranges;
1048+
}
1049+
9141050
static void reorderColumns(ActionsDAG & dag, const Block & header, const std::string & filter_column)
9151051
{
9161052
std::unordered_map<std::string_view, const ActionsDAG::Node *> inputs_map;
@@ -1056,4 +1192,18 @@ Pipes readByLayers(
10561192
return merging_pipes;
10571193
}
10581194

1195+
RangesInDataParts findPKRangesForFinalAfterSkipIndex(
1196+
const KeyDescription & primary_key,
1197+
const KeyDescription & sorting_key,
1198+
RangesInDataParts & ranges_in_data_parts,
1199+
const LoggerPtr & logger)
1200+
{
1201+
bool cannot_sort_primary_key = false;
1202+
if (!isSafePrimaryKey(primary_key) || !sorting_key.reverse_flags.empty())
1203+
{
1204+
LOG_TRACE(logger, "Primary key is not sortable, expanding PK range to entire due to exact_mode.");
1205+
cannot_sort_primary_key = true;
1206+
}
1207+
return findPKRangesForFinalAfterSkipIndexImpl(ranges_in_data_parts, cannot_sort_primary_key, logger);
1208+
}
10591209
}

src/Processors/QueryPlan/PartsSplitter.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
3838
bool split_parts_ranges_into_intersecting_and_non_intersecting,
3939
bool split_intersecting_parts_ranges_into_layers);
4040

41+
/**
42+
*
43+
* If setting use_skip_indexes_if_final_exact_mode=1, then we need to expand
44+
* the initial set of granules returned from the skip index by
45+
* looking for that initial set of PK ranges across all other newer parts.
46+
*/
47+
RangesInDataParts findPKRangesForFinalAfterSkipIndex(
48+
const KeyDescription & primary_key,
49+
const KeyDescription & sorting_key,
50+
RangesInDataParts & ranges_in_data_parts,
51+
const LoggerPtr & logger);
52+
4153
struct SplitPartsByRanges
4254
{
4355
using Values = std::vector<Field>;

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ namespace Setting
186186
extern const SettingsBool query_plan_merge_filters;
187187
extern const SettingsUInt64 merge_tree_min_read_task_size;
188188
extern const SettingsBool read_in_order_use_virtual_row;
189+
extern const SettingsBool use_skip_indexes_if_final_exact_mode;
189190
extern const SettingsBool use_query_condition_cache;
190191
extern const SettingsBool query_condition_cache_store_conditions_as_plaintext;
191192
extern const SettingsBool allow_experimental_analyzer;
@@ -1891,6 +1892,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
18911892

18921893
size_t total_marks_pk = 0;
18931894
size_t parts_before_pk = 0;
1895+
bool add_index_stat_row_for_pk_expand = false;
18941896

18951897
{
18961898
MergeTreeDataSelectExecutor::filterPartsByPartition(
@@ -1935,9 +1937,19 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
19351937
num_streams,
19361938
result.index_stats,
19371939
indexes->use_skip_indexes,
1938-
find_exact_ranges);
1940+
find_exact_ranges,
1941+
query_info_.isFinal());
19391942

19401943
MergeTreeDataSelectExecutor::filterPartsByQueryConditionCache(result.parts_with_ranges, query_info_, context_, log);
1944+
1945+
if (indexes->use_skip_indexes && !indexes->skip_indexes.useful_indices.empty() && query_info_.isFinal() && settings[Setting::use_skip_indexes_if_final_exact_mode])
1946+
{
1947+
result.parts_with_ranges = findPKRangesForFinalAfterSkipIndex(primary_key,
1948+
metadata_snapshot->getSortingKey(),
1949+
result.parts_with_ranges,
1950+
log);
1951+
add_index_stat_row_for_pk_expand = true;
1952+
}
19411953
}
19421954

19431955
size_t sum_marks_pk = total_marks_pk;
@@ -1956,6 +1968,15 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
19561968
sum_rows += part.getRowsCount();
19571969
}
19581970

1971+
if (add_index_stat_row_for_pk_expand)
1972+
{
1973+
result.index_stats.emplace_back(ReadFromMergeTree::IndexStat{
1974+
.type = ReadFromMergeTree::IndexType::PrimaryKeyExpand,
1975+
.description = "Selects all granules that intersect by PK values with the previous skip indexes selection",
1976+
.num_parts_after = result.parts_with_ranges.size(),
1977+
.num_granules_after = sum_marks});
1978+
}
1979+
19591980
result.total_parts = total_parts;
19601981
result.parts_before_pk = parts_before_pk;
19611982
result.selected_parts = result.parts_with_ranges.size();
@@ -2424,6 +2445,8 @@ static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
24242445
return "PrimaryKey";
24252446
case ReadFromMergeTree::IndexType::Skip:
24262447
return "Skip";
2448+
case ReadFromMergeTree::IndexType::PrimaryKeyExpand:
2449+
return "PrimaryKeyExpand";
24272450
}
24282451
}
24292452

src/Processors/QueryPlan/ReadFromMergeTree.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class ReadFromMergeTree final : public SourceStepWithFilter
7171
Partition,
7272
PrimaryKey,
7373
Skip,
74+
PrimaryKeyExpand,
7475
};
7576

7677
/// This is a struct with information about applied indexes.

src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ namespace Setting
8383
extern const SettingsUInt64 parallel_replica_offset;
8484
extern const SettingsUInt64 parallel_replicas_count;
8585
extern const SettingsParallelReplicasMode parallel_replicas_mode;
86+
extern const SettingsBool use_skip_indexes_if_final_exact_mode;
8687
extern const SettingsBool use_query_condition_cache;
8788
extern const SettingsBool allow_experimental_analyzer;
8889
extern const SettingsBool parallel_replicas_local_plan;
@@ -619,7 +620,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
619620
size_t num_streams,
620621
ReadFromMergeTree::IndexStats & index_stats,
621622
bool use_skip_indexes,
622-
bool find_exact_ranges)
623+
bool find_exact_ranges,
624+
bool is_final_query)
623625
{
624626
const Settings & settings = context->getSettingsRef();
625627

@@ -677,6 +679,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
677679
std::atomic<size_t> sum_parts_pk = 0;
678680

679681
RangesInDataParts parts_with_ranges(parts.size());
682+
std::vector<size_t> skip_index_used_in_part(parts.size(), 0);
680683

681684
/// Let's find what range to read from each part.
682685
{
@@ -746,6 +749,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
746749
stat.granules_dropped.fetch_add(total_granules - ranges.ranges.getNumberOfMarks(), std::memory_order_relaxed);
747750
if (ranges.ranges.empty())
748751
stat.parts_dropped.fetch_add(1, std::memory_order_relaxed);
752+
skip_index_used_in_part[part_index] = 1; /// thread-safe
749753
}
750754

751755
for (size_t idx = 0; idx < skip_indexes.merged_indices.size(); ++idx)
@@ -771,8 +775,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
771775
stat.parts_dropped.fetch_add(1, std::memory_order_relaxed);
772776
}
773777

774-
if (!ranges.ranges.empty())
775-
parts_with_ranges[part_index] = std::move(ranges);
778+
parts_with_ranges[part_index] = std::move(ranges);
776779
};
777780

778781
size_t num_threads = std::min<size_t>(num_streams, parts.size());
@@ -825,7 +828,12 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
825828
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
826829
{
827830
auto & part = parts_with_ranges[part_index];
828-
if (!part.data_part)
831+
if (is_final_query && settings[Setting::use_skip_indexes_if_final_exact_mode] && skip_index_used_in_part[part_index])
832+
{
833+
++next_part; /// retain this part even if empty due to FINAL
834+
continue;
835+
}
836+
if (!part.data_part || part.ranges.empty())
829837
continue;
830838

831839
if (next_part != part_index)

0 commit comments

Comments
 (0)