Skip to content

Commit 9c0e538

Browse files
authored
Enable YQL core opt flags from kqp config (#17704)
1 parent fdc10e3 commit 9c0e538

File tree

15 files changed

+930
-646
lines changed

15 files changed

+930
-646
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,14 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
658658
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
659659
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
660660
}
661+
662+
kqpConfig.FilterPushdownOverJoinOptionalSide = serviceConfig.GetFilterPushdownOverJoinOptionalSide();
663+
if (serviceConfig.GetFuseEquiJoinsInputMultiLabels())
664+
kqpConfig.YqlCoreOptimizerFlags.insert("fuseequijoinsinputmultilabels");
665+
if (serviceConfig.GetPullUpFlatMapOverJoinMultipleLabels())
666+
kqpConfig.YqlCoreOptimizerFlags.insert("pullupflatmapoverjoinmultiplelabels");
667+
if (serviceConfig.GetEqualityFilterOverJoin())
668+
kqpConfig.YqlCoreOptimizerFlags.insert("equalityfilteroverjoin");
661669
}
662670

663671
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,

ydb/core/kqp/host/kqp_host.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,9 @@ class TKqpHost : public IKqpHost {
19291929

19301930
TypesCtx->AddDataSource(providerNames, kikimrDataSource);
19311931
TypesCtx->AddDataSink(providerNames, kikimrDataSink);
1932+
TypesCtx->FilterPushdownOverJoinOptionalSide = SessionCtx->ConfigPtr()->FilterPushdownOverJoinOptionalSide;
1933+
const auto &yqlCoreOptFlags = SessionCtx->ConfigPtr()->YqlCoreOptimizerFlags;
1934+
TypesCtx->OptimizerFlags.insert(yqlCoreOptFlags.begin(), yqlCoreOptFlags.end());
19321935

19331936
bool addExternalDataSources = queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query
19341937
|| (queryType == EKikimrQueryType::YqlScript || queryType == EKikimrQueryType::YqlScriptStreaming) && AppData()->FeatureFlags.GetEnableExternalDataSources();

ydb/core/kqp/provider/yql_kikimr_settings.h

+2
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
183183
bool EnableSnapshotIsolationRW = false;
184184
bool AllowMultiBroadcasts = false;
185185
bool DefaultEnableShuffleElimination = false;
186+
bool FilterPushdownOverJoinOptionalSide = false;
187+
THashSet<TString> YqlCoreOptimizerFlags;
186188

187189
void SetDefaultEnabledSpillingNodes(const TString& node);
188190
ui64 GetEnabledSpillingNodes() const;

ydb/core/kqp/ut/join/data/join_order/lookupbug.json

+9-9
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,32 @@
33
"args":
44
[
55
{
6-
"op_name":"LeftJoin (MapJoin)",
6+
"op_name":"InnerJoin (MapJoin)",
77
"args":
88
[
99
{
10-
"op_name":"LeftJoin (MapJoin)",
10+
"op_name":"InnerJoin (MapJoin)",
1111
"args":
1212
[
1313
{
1414
"op_name":"TableFullScan",
15-
"table":"quotas_browsers_relation"
15+
"table":"browsers"
1616
},
1717
{
18-
"op_name":"TableLookup",
19-
"table":"browsers"
18+
"op_name":"TableFullScan",
19+
"table":"quotas_browsers_relation"
2020
}
2121
]
2222
},
2323
{
24-
"op_name":"TableLookup",
25-
"table":"browser_groups"
24+
"op_name":"TablePointLookup",
25+
"table":"quota"
2626
}
2727
]
2828
},
2929
{
30-
"op_name":"TableFullScan",
31-
"table":"quota"
30+
"op_name":"TableLookup",
31+
"table":"browser_groups"
3232
}
3333
]
3434
}

ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22

33
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
4-
54
#include <fmt/format.h>
65

76
namespace NKikimr {
@@ -101,7 +100,7 @@ void PrepareTables(TSession session) {
101100

102101
Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) {
103102

104-
void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) {
103+
void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false, size_t leftTableReads = 7) {
105104
NKikimrConfig::TAppConfig appConfig;
106105
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup);
107106

@@ -125,9 +124,10 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads, b
125124
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
126125

127126
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);
127+
128128
for (const auto& tableStat : stats.query_phases(0).table_access()) {
129129
if (tableStat.name() == "/Root/Left") {
130-
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 7);
130+
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), leftTableReads);
131131
} else {
132132
UNIT_ASSERT_VALUES_EQUAL(tableStat.name(), "/Root/Right");
133133
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), rightTableReads);
@@ -519,7 +519,7 @@ Y_UNIT_TEST_TWIN(LeftJoinRightNullFilter, StreamLookup) {
519519
[["Value3"];#];
520520
[["Value6"];#];
521521
[["Value7"];#]
522-
])", 4, StreamLookup);
522+
])", 8, StreamLookup, 14);
523523
}
524524

525525
Y_UNIT_TEST_TWIN(LeftJoinSkipNullFilter, StreamLookup) {

ydb/core/protos/table_service_config.proto

+5
Original file line numberDiff line numberDiff line change
@@ -384,4 +384,9 @@ message TTableServiceConfig {
384384

385385
optional bool EnableFoldUdfs = 82 [ default = true ];
386386

387+
// YQL core optimizer flags.
388+
optional bool FilterPushdownOverJoinOptionalSide = 83 [ default = true ];
389+
optional bool FuseEquiJoinsInputMultiLabels = 84 [ default = true ];
390+
optional bool PullUpFlatMapOverJoinMultipleLabels = 85 [ default = true ];
391+
optional bool EqualityFilterOverJoin = 86 [ default = false ];
387392
};

ydb/library/yql/dq/opt/dq_opt_join.cpp

+50-20
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@ using namespace NYql::NNodes;
1515
namespace {
1616

1717
struct TJoinInputDesc {
18-
TJoinInputDesc(TMaybe<TStringBuf> label, const TExprBase& input,
18+
TJoinInputDesc(TMaybe<THashSet<TStringBuf>> labels, const TExprBase& input,
1919
TSet<std::pair<TStringBuf, TStringBuf>>&& keys)
20-
: Label(label)
20+
: Labels(labels)
2121
, Input(input)
2222
, Keys(std::move(keys)) {}
2323

2424
bool IsRealTable() const {
25-
return Label.Defined();
25+
return Labels.Defined();
2626
}
2727

28-
TMaybe<TStringBuf> Label; // defined for real table input only, empty otherwise
28+
TMaybe<THashSet<TStringBuf>> Labels; // defined for real table input only, empty otherwise
2929
TExprBase Input;
3030
TSet<std::pair<TStringBuf, TStringBuf>> Keys; // set of (label, column_name) pairs in this input
3131
};
@@ -116,6 +116,14 @@ TExprBase BuildDqJoinInput(TExprContext& ctx, TPositionHandle pos, const TExprBa
116116
return partition;
117117
}
118118

119+
TExprNode::TPtr CreateLabelList(const THashSet<TStringBuf>& labels, const TPositionHandle& position, TExprContext& ctx) {
120+
TExprNode::TListType newKeys;
121+
for (const auto& label : labels) {
122+
newKeys.push_back(ctx.NewAtom(position, label));
123+
}
124+
return ctx.NewList(position, std::move(newKeys));
125+
}
126+
119127
TMaybe<TJoinInputDesc> BuildDqJoin(
120128
const TCoEquiJoinTuple& joinTuple,
121129
const THashMap<TStringBuf, TJoinInputDesc>& inputs,
@@ -129,9 +137,12 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
129137
{
130138
TMaybe<TJoinInputDesc> left;
131139
TVector<TString> lhsLabels;
140+
TStringBuf leftLabel;
141+
TStringBuf rightLabel;
132142
if (joinTuple.LeftScope().Maybe<TCoAtom>()) {
133143
lhsLabels.push_back(joinTuple.LeftScope().Cast<TCoAtom>().StringValue());
134144
left = inputs.at(joinTuple.LeftScope().Cast<TCoAtom>().Value());
145+
leftLabel = joinTuple.LeftScope().Cast<TCoAtom>().Value();
135146
YQL_ENSURE(left, "unknown scope " << joinTuple.LeftScope().Cast<TCoAtom>().Value());
136147
} else {
137148
left = BuildDqJoin(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), inputs, mode, ctx, typeCtx, lhsLabels, hints, useCBO);
@@ -145,6 +156,7 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
145156
if (joinTuple.RightScope().Maybe<TCoAtom>()) {
146157
rhsLabels.push_back(joinTuple.RightScope().Cast<TCoAtom>().StringValue());
147158
right = inputs.at(joinTuple.RightScope().Cast<TCoAtom>().Value());
159+
rightLabel = joinTuple.RightScope().Cast<TCoAtom>().Value();
148160
YQL_ENSURE(right, "unknown scope " << joinTuple.RightScope().Cast<TCoAtom>().Value());
149161
} else {
150162
right = BuildDqJoin(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), inputs, mode, ctx, typeCtx, rhsLabels, hints, useCBO);
@@ -187,12 +199,13 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
187199
resultKeys.insert(right->Keys.begin(), right->Keys.end());
188200
}
189201

190-
auto leftTableLabel = left->IsRealTable()
191-
? BuildAtom(*left->Label, left->Input.Pos(), ctx).Ptr()
192-
: Build<TCoVoid>(ctx, left->Input.Pos()).Done().Ptr();
193-
auto rightTableLabel = right->IsRealTable()
194-
? BuildAtom(*right->Label, right->Input.Pos(), ctx).Ptr()
195-
: Build<TCoVoid>(ctx, right->Input.Pos()).Done().Ptr();
202+
auto leftTableLabel = left->IsRealTable() ? (left->Labels->size() > 1 ? CreateLabelList(*(left->Labels), left->Input.Pos(), ctx)
203+
: BuildAtom(leftLabel, left->Input.Pos(), ctx).Ptr())
204+
: Build<TCoVoid>(ctx, left->Input.Pos()).Done().Ptr();
205+
206+
auto rightTableLabel = right->IsRealTable() ? (right->Labels->size() > 1 ? CreateLabelList(*(right->Labels), right->Input.Pos(), ctx)
207+
: BuildAtom(rightLabel, right->Input.Pos(), ctx).Ptr())
208+
: Build<TCoVoid>(ctx, right->Input.Pos()).Done().Ptr();
196209

197210
size_t joinKeysCount = joinTuple.LeftKeys().Size() / 2;
198211
TVector<TCoAtom> leftJoinKeys;
@@ -353,21 +366,37 @@ TMaybe<TJoinInputDesc> BuildDqJoin(
353366
}
354367

355368
TMaybe<TJoinInputDesc> PrepareJoinInput(const TCoEquiJoinInput& input) {
356-
if (!input.Scope().Maybe<TCoAtom>()) {
357-
YQL_CLOG(TRACE, CoreDq) << "EquiJoin input scope is not an Atom: " << input.Scope().Ref().Content();
358-
return {};
369+
THashSet<TStringBuf> labels;
370+
if (input.Scope().Maybe<TCoAtom>()) {
371+
labels.insert(input.Scope().Cast<TCoAtom>().Value());
372+
} else {
373+
auto list = input.Scope().Cast<TCoAtomList>();
374+
for (auto atomLabel : list) {
375+
labels.insert(atomLabel.Value());
376+
}
359377
}
360-
auto scope = input.Scope().Cast<TCoAtom>().Value();
361378

362379
auto listType = input.List().Ref().GetTypeAnn()->Cast<TListExprType>();
363380
auto resultStructType = listType->GetItemType()->Cast<TStructExprType>();
364381

365382
TSet<std::pair<TStringBuf, TStringBuf>> keys;
366383
for (auto member : resultStructType->GetItems()) {
367-
keys.emplace(scope, member->GetName());
384+
if (input.Scope().Maybe<TCoAtom>()) {
385+
keys.emplace(input.Scope().Cast<TCoAtom>().Value(), member->GetName());
386+
} else {
387+
auto fullMemberName = member->GetName();
388+
if (fullMemberName.find(".") != TString::npos) {
389+
TStringBuf table;
390+
TStringBuf column;
391+
SplitTableName(fullMemberName, table, column);
392+
keys.emplace(table, column);
393+
} else {
394+
return {};
395+
}
396+
}
368397
}
369398

370-
return TJoinInputDesc(scope, input.List(), std::move(keys));
399+
return TJoinInputDesc(labels, input.List(), std::move(keys));
371400
}
372401

373402
TStringBuf RotateRightJoinType(TStringBuf joinType) {
@@ -396,13 +425,13 @@ std::pair<TVector<TCoAtom>, TVector<TCoAtom>> GetJoinKeys(const TDqJoin& join, T
396425
auto rightLabel = keyTuple.RightLabel().Value();
397426

398427
auto leftKey = Build<TCoAtom>(ctx, join.Pos())
399-
.Value(join.LeftLabel().Maybe<TCoAtom>() || keyTuple.LeftColumn().Value().starts_with("_yql_dq_key_left_")
428+
.Value((join.LeftLabel().Maybe<TCoAtom>() || keyTuple.LeftColumn().Value().starts_with("_yql_dq_key_left_")) && !join.LeftLabel().Maybe<TCoAtomList>()
400429
? keyTuple.LeftColumn().StringValue()
401430
: FullColumnName(leftLabel, keyTuple.LeftColumn().Value()))
402431
.Done();
403432

404433
auto rightKey = Build<TCoAtom>(ctx, join.Pos())
405-
.Value(join.RightLabel().Maybe<TCoAtom>() || keyTuple.RightColumn().Value().starts_with("_yql_dq_key_right_")
434+
.Value((join.RightLabel().Maybe<TCoAtom>() || keyTuple.RightColumn().Value().starts_with("_yql_dq_key_right_")) && !join.RightLabel().Maybe<TCoAtomList>()
406435
? keyTuple.RightColumn().StringValue()
407436
: FullColumnName(rightLabel, keyTuple.RightColumn().Value()))
408437
.Done();
@@ -414,7 +443,6 @@ std::pair<TVector<TCoAtom>, TVector<TCoAtom>> GetJoinKeys(const TDqJoin& join, T
414443
return std::make_pair(std::move(leftJoinKeys), std::move(rightJoinKeys));
415444
}
416445

417-
418446
TDqJoinBase DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
419447
TExprContext& ctx, bool useGraceCore)
420448
{
@@ -521,7 +549,9 @@ TExprBase DqRewriteEquiJoin(
521549
THashMap<TStringBuf, TJoinInputDesc> inputs;
522550
for (size_t i = 0; i < equiJoin.ArgCount() - 2; ++i) {
523551
if (auto input = PrepareJoinInput(equiJoin.Arg(i).Cast<TCoEquiJoinInput>())) {
524-
inputs.emplace(*input->Label, std::move(*input));
552+
for (auto label : *(input->Labels)) {
553+
inputs.emplace(label, *input);
554+
}
525555
} else {
526556
return node;
527557
}

ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ bool DqCollectJoinRelationsWithStats(
3030

3131
auto stats = typesCtx.GetStats(joinArg.Raw());
3232

33-
if (!stats) {
34-
YQL_CLOG(TRACE, CoreDq) << "Didn't find statistics for scope " << input.Scope().Cast<TCoAtom>().StringValue() << "\n";
33+
auto scope = input.Scope();
34+
if (!scope.Maybe<TCoAtom>()){
3535
return false;
3636
}
3737

38-
auto scope = input.Scope();
39-
if (!scope.Maybe<TCoAtom>()){
38+
if (!stats) {
39+
YQL_CLOG(TRACE, CoreDq) << "Didn't find statistics for scope " << input.Scope().Cast<TCoAtom>().StringValue() << "\n";
4040
return false;
4141
}
4242

0 commit comments

Comments
 (0)