Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/kqp_opt_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ NYql::NNodes::TKqpTable BuildTableMeta(const NYql::TKikimrTableMetadata& tableMe
TIntrusivePtr<NYql::TKikimrTableMetadata> GetIndexMetadata(const NYql::NNodes::TKqlReadTableIndex& index,
const NYql::TKikimrTablesData& tables, TStringBuf cluster);

TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>> BuildSecondaryIndexVector(
TVector<std::pair<NYql::TExprNode::TPtr, const NYql::TIndexDescription*>> BuildEffectedIndexTables(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Может хоть Affected уж тогда? :-)
А всё остальное норм

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx,
const THashSet<TStringBuf>* filter,
const std::function<NYql::NNodes::TExprBase (const NYql::TKikimrTableMetadata&,
Expand Down
19 changes: 12 additions & 7 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ TExprBase BuildUpdateOnTableWithIndex(const TKiWriteTable& write, const TCoAtomL
const bool isSink, const TKikimrTableDescription& tableData, TExprContext& ctx)
{
if (isSink) {
auto indexes = BuildSecondaryIndexVector(tableData, write.Pos(), ctx, nullptr,
auto indexes = BuildEffectedIndexTables(tableData, write.Pos(), ctx, nullptr,
[] (const TKikimrTableMetadata& meta, TPositionHandle pos, TExprContext& ctx) -> TExprBase {
return BuildTableMeta(meta, pos, ctx);
});
Expand Down Expand Up @@ -695,19 +695,24 @@ TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrT
updateColumnsList.push_back(TCoAtom(ctx.NewAtom(update.Pos(), column)));
}

auto indexes = BuildSecondaryIndexVector(tableData, update.Pos(), ctx, nullptr,
auto indexes = BuildEffectedIndexTables(tableData, update.Pos(), ctx, nullptr,
[] (const TKikimrTableMetadata& meta, TPositionHandle pos, TExprContext& ctx) -> TExprBase {
return BuildTableMeta(meta, pos, ctx);
});

// Rewrite UPDATE to UPDATE ON for complex indexes
auto idxNeedsKqpEffect = [](std::pair<TExprNode::TPtr, const TIndexDescription*>& x) {
return x.second->Type == TIndexDescription::EType::GlobalSyncUnique ||
x.second->Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree;
switch (x.second->Type) {
case TIndexDescription::EType::GlobalSync:
case TIndexDescription::EType::GlobalAsync:
return false;
case TIndexDescription::EType::GlobalSyncUnique:
case TIndexDescription::EType::GlobalSyncVectorKMeansTree:
case TIndexDescription::EType::GlobalFulltext:
return true;
}
};

const bool needsKqpEffect = std::find_if(indexes.begin(), indexes.end(), idxNeedsKqpEffect) != indexes.end();

// For unique or vector index rewrite UPDATE to UPDATE ON
if (needsKqpEffect) {
return Build<TKqlUpdateRowsIndex>(ctx, update.Pos())
.Table(BuildTableMeta(tableData, update.Pos(), ctx))
Expand Down
17 changes: 4 additions & 13 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ TExprBase BuildDeleteIndexStagesImpl(const TKikimrTableDescription& table,
case TIndexDescription::EType::GlobalSync:
case TIndexDescription::EType::GlobalAsync:
case TIndexDescription::EType::GlobalSyncUnique: {
// deleteIndexKeys are already correct
// deleteIndexKeys are already correct
break;
}
case TIndexDescription::EType::GlobalSyncVectorKMeansTree: {
Expand All @@ -92,17 +92,8 @@ TExprBase BuildDeleteIndexStagesImpl(const TKikimrTableDescription& table,
break;
}
case TIndexDescription::EType::GlobalFulltext: {
// For fulltext indexes, we need to tokenize the text from the rows being deleted
// and then delete the corresponding token rows from the index table
auto deleteKeysPrecompute = Build<TDqPhyPrecompute>(ctx, del.Pos())
.Connection<TDqCnUnionAll>()
.Output()
.Stage(ReadTableToStage(deleteIndexKeys, ctx))
.Index().Build("0")
.Build()
.Build()
.Done();
deleteIndexKeys = BuildFulltextIndexRows(table, indexDesc, deleteKeysPrecompute, indexTableColumnsSet, indexTableColumns, /*includeDataColumns=*/false,
// For fulltext indexes, we need to tokenize the text and create deleted rows
deleteIndexKeys = BuildFulltextIndexRows(table, indexDesc, deleteIndexKeys, indexTableColumnsSet, indexTableColumns, /*includeDataColumns=*/false,
del.Pos(), ctx);
break;
}
Expand Down Expand Up @@ -133,7 +124,7 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, del.Table().Path());
const auto& pk = table.Metadata->KeyColumnNames;

const auto indexes = BuildSecondaryIndexVector(table, del.Pos(), ctx);
const auto indexes = BuildEffectedIndexTables(table, del.Pos(), ctx);
YQL_ENSURE(indexes);

// Skip lookup means that the input already has all required columns and we only need to project them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ using TSecondaryIndexes = TVector<std::pair<
NYql::TExprNode::TPtr,
const NYql::TIndexDescription*>>;

TSecondaryIndexes BuildSecondaryIndexVector(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
TSecondaryIndexes BuildEffectedIndexTables(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
NYql::TExprContext& ctx, const THashSet<TStringBuf>* filter = nullptr);

struct TCondenseInputResult {
Expand Down Expand Up @@ -98,7 +98,7 @@ NYql::NNodes::TKqpCnStreamLookup BuildStreamLookupOverPrecompute(const NYql::TKi
NYql::NNodes::TExprBase input,
const NYql::NNodes::TKqpTable& kqpTableNode, const NYql::TPositionHandle& pos, NYql::TExprContext& ctx, const TVector<TString>& extraColumnsToRead = {});

NYql::NNodes::TDqStageBase ReadTableToStage(const NYql::NNodes::TExprBase& expr, NYql::TExprContext& ctx);
NYql::NNodes::TDqStageBase ReadInputToStage(const NYql::NNodes::TExprBase& expr, NYql::TExprContext& ctx);

NYql::NNodes::TExprBase BuildVectorIndexPostingRows(const NYql::TKikimrTableDescription& table,
const NYql::NNodes::TKqpTable& tableNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ TExprBase BuildFulltextIndexRows(const TKikimrTableDescription& table, const TIn
const TString textColumn = settings.columns().at(0).column();
const auto& analyzers = settings.columns().at(0).analyzers();

// Serialize analyzer settings for runtime usage
TString settingsProto;
YQL_ENSURE(analyzers.SerializeToString(&settingsProto));

auto inputRowArg = TCoArgument(ctx.NewArgument(pos, "input_row"));
auto tokenArg = TCoArgument(ctx.NewArgument(pos, "token"));

auto readInputRows = inputRows.Maybe<TDqPhyPrecompute>()
? inputRows
: Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnUnionAll>()
.Output()
.Stage(ReadInputToStage(inputRows, ctx))
.Index().Build("0")
.Build()
.Build()
.Done();

// Build output row structure for each token
TVector<TExprBase> tokenRowTuples;
Expand Down Expand Up @@ -93,12 +100,15 @@ TExprBase BuildFulltextIndexRows(const TKikimrTableDescription& table, const TIn
.Name().Build(textColumn)
.Done();

// Create callable for fulltext tokenization
// Format: FulltextAnalyze(text: String, settings: String) -> List<String>
// Serialize analyzer settings for FulltextAnalyze
TString settingsProto;
YQL_ENSURE(analyzers.SerializeToString(&settingsProto));
auto settingsLiteral = Build<TCoString>(ctx, pos)
.Literal().Build(settingsProto)
.Done();

// Create callable for fulltext tokenization
// Format: FulltextAnalyze(text: String, settings: String) -> List<String>
auto analyzeCallable = ctx.Builder(pos)
.Callable("FulltextAnalyze")
.Add(0, textMember.Ptr())
Expand All @@ -108,7 +118,7 @@ TExprBase BuildFulltextIndexRows(const TKikimrTableDescription& table, const TIn

auto analyzeStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputRows)
.Add(readInputRows)
.Build()
.Program()
.Args({"rows"})
Expand Down
45 changes: 26 additions & 19 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ TDqPhyPrecompute PrecomputeCondenseInputResult(const TCondenseInputResult& conde
.Done();
}

TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryIndexVector(
TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildEffectedIndexTables(
const TKikimrTableDescription& table,
TPositionHandle pos,
TExprContext& ctx,
const THashSet<TStringBuf>* filter,
const std::function<TExprBase (const TKikimrTableMetadata&, TPositionHandle, TExprContext&)>& tableBuilder)
{
TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> secondaryIndexes;
secondaryIndexes.reserve(table.Metadata->Indexes.size());
TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> result(::Reserve(table.Metadata->Indexes.size()));
YQL_ENSURE(table.Metadata->Indexes.size() == table.Metadata->ImplTables.size());
for (size_t i = 0; i < table.Metadata->Indexes.size(); i++) {
const auto& index = table.Metadata->Indexes[i];
Expand Down Expand Up @@ -112,34 +111,42 @@ TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>> BuildSecondaryInde

if (index.KeyColumns && addIndex) {
auto& implTable = table.Metadata->ImplTables[i];
if (index.Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree) {
if (index.KeyColumns.size() == 1) {
YQL_ENSURE(implTable->Next && !implTable->Next->Next);
} else {
YQL_ENSURE(implTable->Next && implTable->Next->Next && !implTable->Next->Next->Next);
switch (index.Type) {
case TIndexDescription::EType::GlobalSync:
case TIndexDescription::EType::GlobalAsync:
case TIndexDescription::EType::GlobalSyncUnique:
case TIndexDescription::EType::GlobalFulltext: {
YQL_ENSURE(!implTable->Next);
auto indexTable = tableBuilder(*implTable, pos, ctx).Ptr();
result.emplace_back(indexTable, &index);
break;
}
case TIndexDescription::EType::GlobalSyncVectorKMeansTree: {
if (index.KeyColumns.size() == 1) {
YQL_ENSURE(implTable->Next && !implTable->Next->Next);
} else {
YQL_ENSURE(implTable->Next && implTable->Next->Next && !implTable->Next->Next->Next);
}
auto postingTable = implTable->Next;
YQL_ENSURE(postingTable->Name.EndsWith(NTableIndex::NKMeans::PostingTable));
auto indexTable = tableBuilder(*postingTable, pos, ctx).Ptr();
result.emplace_back(indexTable, &index);
break;
}
auto postingTable = implTable->Next;
YQL_ENSURE(postingTable->Name.EndsWith(NTableIndex::NKMeans::PostingTable));
auto indexTable = tableBuilder(*postingTable, pos, ctx).Ptr();
secondaryIndexes.emplace_back(indexTable, &index);
} else {
YQL_ENSURE(!implTable->Next);
auto indexTable = tableBuilder(*implTable, pos, ctx).Ptr();
secondaryIndexes.emplace_back(indexTable, &index);
}
}
}
return secondaryIndexes;
return result;
}

TSecondaryIndexes BuildSecondaryIndexVector(const TKikimrTableDescription& table, TPositionHandle pos,
TSecondaryIndexes BuildEffectedIndexTables(const TKikimrTableDescription& table, TPositionHandle pos,
TExprContext& ctx, const THashSet<TStringBuf>* filter)
{
static auto cb = [] (const TKikimrTableMetadata& meta, TPositionHandle pos, TExprContext& ctx) -> TExprBase {
return BuildTableMeta(meta, pos, ctx);
};

return BuildSecondaryIndexVector(table, pos, ctx, filter, cb);
return BuildEffectedIndexTables(table, pos, ctx, filter, cb);
}

TMaybeNode<TDqPhyPrecompute> PrecomputeTableLookupDict(const TDqPhyPrecompute& lookupKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq

const bool isSink = NeedSinks(table, kqpCtx);

auto indexes = BuildSecondaryIndexVector(table, insert.Pos(), ctx, nullptr);
auto indexes = BuildEffectedIndexTables(table, insert.Pos(), ctx, nullptr);
YQL_ENSURE(indexes);
const bool canUseStreamIndex = kqpCtx.Config->EnableIndexStreamWrite
&& std::all_of(indexes.begin(), indexes.end(), [](const auto& index) {
Expand Down Expand Up @@ -227,7 +227,7 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
break;
}
case TIndexDescription::EType::GlobalFulltext: {
// For fulltext indexes, we need to tokenize the text and create index rows and refill index columns
// For fulltext indexes, we need to tokenize the text and create inserted rows
upsertIndexRows = BuildFulltextIndexRows(table, indexDesc, insertRowsPrecompute, inputColumnsSet, indexTableColumns, /*includeDataColumns=*/true,
insert.Pos(), ctx);
break;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ TExprBase KqpBuildUpdateStages(TExprBase node, TExprContext& ctx, const TKqpOpti
}
}

TDqStageBase ReadTableToStage(const TExprBase& expr, TExprContext& ctx) {
TDqStageBase ReadInputToStage(const TExprBase& expr, TExprContext& ctx) {
if (expr.Maybe<TDqStageBase>()) {
return expr.Cast<TDqStageBase>();
}
Expand Down
66 changes: 46 additions & 20 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
}

auto filter = (mode == TKqpPhyUpsertIndexMode::UpdateOn) ? &inputColumnsSet : nullptr;
const auto indexes = BuildSecondaryIndexVector(table, pos, ctx, filter);
const auto indexes = BuildEffectedIndexTables(table, pos, ctx, filter);

auto checkedInput = RewriteInputForConstraint(inputRows, inputColumnsSet, columnsWithDefaultsSet, table, indexes, pos, ctx);

Expand Down Expand Up @@ -885,13 +885,26 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
? MakeRowsFromTupleDict(lookupDictRecomputed, pk, indexTableColumnsWithoutData, pos, ctx)
: MakeRowsFromDict(lookupDict.Cast(), pk, indexTableColumnsWithoutData, pos, ctx);

if (indexDesc->Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree) {
if (indexDesc->KeyColumns.size() > 1) {
deleteIndexKeys = BuildVectorIndexPrefixRows(table, *prefixTable, false, indexDesc, deleteIndexKeys, indexTableColumnsWithoutData, pos, ctx);
switch (indexDesc->Type) {
case TIndexDescription::EType::GlobalSync:
case TIndexDescription::EType::GlobalAsync:
case TIndexDescription::EType::GlobalSyncUnique:
// deleteIndexKeys are already correct
break;
case TIndexDescription::EType::GlobalSyncVectorKMeansTree: {
if (indexDesc->KeyColumns.size() > 1) {
deleteIndexKeys = BuildVectorIndexPrefixRows(table, *prefixTable, false, indexDesc, deleteIndexKeys, indexTableColumnsWithoutData, pos, ctx);
}
deleteIndexKeys = BuildVectorIndexPostingRows(table, mainTableNode,
indexDesc->Name, indexTableColumnsWithoutData, deleteIndexKeys, false, pos, ctx);
break;
}
case TIndexDescription::EType::GlobalFulltext: {
// For fulltext indexes, we need to tokenize the text and create deleted rows
deleteIndexKeys = BuildFulltextIndexRows(table, indexDesc, deleteIndexKeys, indexTableColumnsSet,
indexTableColumnsWithoutData, /*includeDataColumns=*/false, pos, ctx);
break;
}

deleteIndexKeys = BuildVectorIndexPostingRows(table, mainTableNode,
indexDesc->Name, indexTableColumnsWithoutData, deleteIndexKeys, false, pos, ctx);
}

auto indexDelete = Build<TKqlDeleteRows>(ctx, pos)
Expand All @@ -916,21 +929,34 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
: MakeUpsertIndexRows(mode, rowsPrecompute.Cast(), lookupDict.Cast(),
inputColumnsSet, indexTableColumns, table, pos, ctx, false);

if (indexDesc->Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree) {
if (indexDesc->KeyColumns.size() > 1) {
if (prefixTable->Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) {
auto res = BuildVectorIndexPrefixRowsWithNew(table, *prefixTable, indexDesc, upsertIndexRows, indexTableColumns, pos, ctx);
upsertIndexRows = std::move(res.first);
effects.emplace_back(std::move(res.second));
} else {
// Handle old prefixed vector index tables without the sequence
upsertIndexRows = BuildVectorIndexPrefixRows(table, *prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, pos, ctx);
switch (indexDesc->Type) {
case TIndexDescription::EType::GlobalSync:
case TIndexDescription::EType::GlobalAsync:
case TIndexDescription::EType::GlobalSyncUnique:
// upsertIndexRows are already correct
break;
case TIndexDescription::EType::GlobalSyncVectorKMeansTree: {
if (indexDesc->KeyColumns.size() > 1) {
if (prefixTable->Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) {
auto res = BuildVectorIndexPrefixRowsWithNew(table, *prefixTable, indexDesc, upsertIndexRows, indexTableColumns, pos, ctx);
upsertIndexRows = std::move(res.first);
effects.emplace_back(std::move(res.second));
} else {
// Handle old prefixed vector index tables without the sequence
upsertIndexRows = BuildVectorIndexPrefixRows(table, *prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, pos, ctx);
}
}
upsertIndexRows = BuildVectorIndexPostingRows(table, mainTableNode,
indexDesc->Name, indexTableColumns, upsertIndexRows, true, pos, ctx);
indexTableColumns = BuildVectorIndexPostingColumns(table, indexDesc);
break;
}
case TIndexDescription::EType::GlobalFulltext: {
// For fulltext indexes, we need to tokenize the text and create upserted rows
upsertIndexRows = BuildFulltextIndexRows(table, indexDesc, upsertIndexRows, indexTableColumnsSet,
indexTableColumns, /*includeDataColumns=*/true, pos, ctx);
break;
}

upsertIndexRows = BuildVectorIndexPostingRows(table, mainTableNode,
indexDesc->Name, indexTableColumns, upsertIndexRows, true, pos, ctx);
indexTableColumns = BuildVectorIndexPostingColumns(table, indexDesc);
}

auto indexUpsert = Build<TKqlUpsertRows>(ctx, pos)
Expand Down
Loading
Loading