Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance MERGE .. WHEN NOT MATCHED BY SOURCE for repartitioned source #7900

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/backend/distributed/executor/merge_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@
copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition);
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
bool hasReturning = distributedPlan->expectResults;
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(mergeQuery);
int partitionColumnIndex = distributedPlan->sourceResultRepartitionColumnIndex;

/*
Expand All @@ -233,7 +234,7 @@

ereport(DEBUG1, (errmsg("Collect source query results on coordinator")));

List *prunedTaskList = NIL;
List *prunedTaskList = NIL, *emptySourceTaskList = NIL;
HTAB *shardStateHash =
ExecuteMergeSourcePlanIntoColocatedIntermediateResults(
targetRelationId,
Expand All @@ -255,7 +256,8 @@
* We cannot actually execute MERGE INTO ... tasks that read from
* intermediate results that weren't created because no rows were
* written to them. Prune those tasks out by only including tasks
* on shards with connections.
* on shards with connections; however, if the MERGE INTO includes
* a NOT MATCHED BY SOURCE clause we need to include the task.
*/
Task *task = NULL;
foreach_declared_ptr(task, taskList)
Expand All @@ -268,6 +270,19 @@
{
prunedTaskList = lappend(prunedTaskList, task);
}
else if (hasNotMatchedBySource)
{
emptySourceTaskList = lappend(emptySourceTaskList, task);
}
}

if (emptySourceTaskList != NIL)
{
ereport(DEBUG1, (errmsg("MERGE has NOT MATCHED BY SOURCE clause, "

Check warning on line 281 in src/backend/distributed/executor/merge_executor.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/executor/merge_executor.c#L281

Added line #L281 was not covered by tests
"execute MERGE on all shards")));
AdjustTaskQueryForEmptySource(targetRelationId, mergeQuery, emptySourceTaskList,
intermediateResultIdPrefix);
prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList);
}

if (prunedTaskList == NIL)
Expand Down
121 changes: 113 additions & 8 deletions src/backend/distributed/executor/repartition_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "nodes/parsenodes.h"

#include "distributed/citus_custom_scan.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/multi_physical_planner.h"
Expand Down Expand Up @@ -101,6 +102,40 @@
}


/*
* HasMergeNotMatchedBySource returns true if the MERGE query has a
* WHEN NOT MATCHED BY SOURCE clause. If it does, we need to execute
* the MERGE query on all shards of the target table, regardless of
* whether or not the source shard has any rows.
*/
bool
HasMergeNotMatchedBySource(Query *query)
{
if (!IsMergeQuery(query))
{
return false;
}

bool haveNotMatchedBySource = false;

#if PG_VERSION_NUM >= PG_VERSION_17
ListCell *lc;
foreach(lc, query->mergeActionList)
{
MergeAction *action = lfirst_node(MergeAction, lc);

if (action->matchKind == MERGE_WHEN_NOT_MATCHED_BY_SOURCE)
{
haveNotMatchedBySource = true;
break;
}
}
#endif

return haveNotMatchedBySource;
}


/*
* GenerateTaskListWithColocatedIntermediateResults generates a list of tasks
* for a query that inserts into a target relation and selects from a set of
Expand Down Expand Up @@ -200,6 +235,61 @@
}


/*
* AdjustTaskQueryForEmptySource adjusts the query for tasks that read from an
* intermediate result to instead read from an empty relation. This ensures that
* the MERGE query is executed on all shards of the target table, because it has
* a NOT MATCHED BY SOURCE clause, which will be true for all target shards where
* the source shard has no rows.
*/
void
AdjustTaskQueryForEmptySource(Oid targetRelationId,
Query *mergeQuery,
List *tasks,
char *resultIdPrefix)
{
Query *mergeQueryCopy = copyObject(mergeQuery);
RangeTblEntry *selectRte = ExtractSourceResultRangeTableEntry(mergeQueryCopy);
RangeTblEntry *mergeRte = ExtractResultRelationRTE(mergeQueryCopy);
List *targetList = selectRte->subquery->targetList;
ListCell *taskCell = NULL;

foreach(taskCell, tasks)
{
Task *task = lfirst(taskCell);
uint64 shardId = task->anchorShardId;
StringInfo queryString = makeStringInfo();
StringInfo resultId = makeStringInfo();

appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);

/* Generate a query for an empty relation */
selectRte->subquery = BuildEmptyResultQuery(targetList, resultId->data);

/* setting an alias simplifies deparsing of RETURNING */
if (mergeRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
mergeRte->alias = alias;

Check warning on line 273 in src/backend/distributed/executor/repartition_executor.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/executor/repartition_executor.c#L272-L273

Added lines #L272 - L273 were not covered by tests
}

/*
* Generate a query string for the query that merges into a shard and reads
* from an empty relation.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
mergeQueryCopy->cteList = NIL;
deparse_shard_query(mergeQueryCopy, targetRelationId, shardId, queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));

Check warning on line 286 in src/backend/distributed/executor/repartition_executor.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/executor/repartition_executor.c#L286

Added line #L286 was not covered by tests

SetTaskQueryString(task, queryString->data);
}
}


/*
* GenerateTaskListWithRedistributedResults returns a task list to insert given
* redistributedResults into the given target relation.
Expand All @@ -223,6 +313,7 @@
Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery);
Oid targetRelationId = targetRelation->relationId;
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(modifyResultQuery);

int shardCount = targetRelation->shardIntervalArrayLength;
int shardOffset = 0;
Expand All @@ -242,19 +333,33 @@
StringInfo queryString = makeStringInfo();

/* skip empty tasks */
if (resultIdList == NIL)
if (resultIdList == NIL && !hasNotMatchedBySource)
{
continue;
}

/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
Query *fragmentSetQuery = NULL;

/* generate the query on the intermediate result */
Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
if (resultIdList != NIL)
{
/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);

/* generate the query on the intermediate result */
fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
}
else
{
/* No source data, but MERGE query has NOT MATCHED BY SOURCE */
StringInfo emptyFragmentId = makeStringInfo();
appendStringInfo(emptyFragmentId, "%s_" UINT64_FORMAT, "temp_empty_rel_",
shardId);
fragmentSetQuery = BuildEmptyResultQuery(selectTargetList,
emptyFragmentId->data);
}

/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = fragmentSetQuery;
Expand Down
123 changes: 123 additions & 0 deletions src/backend/distributed/planner/recursive_planning.c
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,129 @@
}


/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting approach. Thinking out loud, it's not that we have to do in this PR, may be as a special case, intermediate results should generate empty files with the ID, not sure if that's better than this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that - intermediate results .. empty files with the ID - was the approach I initially tried ! However, it complicated the multi_copy logic significantly, specifically the CitusCopyDestReceiver, which streams tuples into the appropriate intermediate result file. As the problem is specific to merge execution, I thought it was preferable to to go with the empty relation approach. I also reckoned that it is possibly more efficient than creating an empty intermediate result file, but lmk what you think?

* For the given target list, build an empty relation with the same target list.
* For example, if the target list is (a, b, c), and resultId is "empty", then
* it returns a Query object for this SQL:
* SELECT a, b, c FROM (VALUES (NULL, NULL, NULL)) AS empty(a, b, c) WHERE false;
*/
Query *
BuildEmptyResultQuery(List *targetEntryList, char *resultId)
{
List *targetList = NIL;
ListCell *targetEntryCell = NULL;

List *colTypes = NIL;
List *colTypMods = NIL;
List *colCollations = NIL;
List *colNames = NIL;

List *valueConsts = NIL;
List *valueTargetList = NIL;
List *valueColNames = NIL;

int targetIndex = 1;

/* build the target list and column lists needed */
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Node *targetExpr = (Node *) targetEntry->expr;
char *columnName = targetEntry->resname;
Oid columnType = exprType(targetExpr);
Oid columnTypMod = exprTypmod(targetExpr);
Oid columnCollation = exprCollation(targetExpr);

if (targetEntry->resjunk)
{
continue;

Check warning on line 2329 in src/backend/distributed/planner/recursive_planning.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/planner/recursive_planning.c#L2329

Added line #L2329 was not covered by tests
}

Var *tgtVar = makeVar(1, targetIndex, columnType, columnTypMod, columnCollation,
0);
TargetEntry *tgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex, columnName,
false);
Const *valueConst = makeConst(columnType, columnTypMod, columnCollation, 0,
(Datum) 0, true, false);

StringInfoData *columnString = makeStringInfo();
appendStringInfo(columnString, "column%d", targetIndex);

TargetEntry *valueTgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex,
columnString->data, false);

valueConsts = lappend(valueConsts, valueConst);
valueTargetList = lappend(valueTargetList, valueTgtEntry);
valueColNames = lappend(valueColNames, makeString(columnString->data));

colNames = lappend(colNames, makeString(columnName));
colTypes = lappend_oid(colTypes, columnType);
colTypMods = lappend_oid(colTypMods, columnTypMod);
colCollations = lappend_oid(colCollations, columnCollation);

targetList = lappend(targetList, tgtEntry);

targetIndex++;
}

/* Build a RangeTable Entry for the VALUES relation */
RangeTblEntry *valuesRangeTable = makeNode(RangeTblEntry);
valuesRangeTable->rtekind = RTE_VALUES;
valuesRangeTable->values_lists = list_make1(valueConsts);
valuesRangeTable->colcollations = colCollations;
valuesRangeTable->coltypes = colTypes;
valuesRangeTable->coltypmods = colTypMods;
valuesRangeTable->alias = NULL;
valuesRangeTable->eref = makeAlias("*VALUES*", valueColNames);
valuesRangeTable->inFromCl = true;

RangeTblRef *valuesRTRef = makeNode(RangeTblRef);
valuesRTRef->rtindex = 1;

FromExpr *valuesJoinTree = makeNode(FromExpr);
valuesJoinTree->fromlist = list_make1(valuesRTRef);

/* build the VALUES query */
Query *valuesQuery = makeNode(Query);
valuesQuery->canSetTag = true;
valuesQuery->commandType = CMD_SELECT;
valuesQuery->rtable = list_make1(valuesRangeTable);
#if PG_VERSION_NUM >= PG_VERSION_16
valuesQuery->rteperminfos = NIL;
#endif
valuesQuery->jointree = valuesJoinTree;
valuesQuery->targetList = valueTargetList;

/* build the relation selecting from the VALUES */
RangeTblEntry *emptyRangeTable = makeNode(RangeTblEntry);
emptyRangeTable->rtekind = RTE_SUBQUERY;
emptyRangeTable->subquery = valuesQuery;
emptyRangeTable->alias = makeAlias(resultId, colNames);
emptyRangeTable->eref = emptyRangeTable->alias;
emptyRangeTable->inFromCl = true;

/* build the SELECT query */
Query *resultQuery = makeNode(Query);
resultQuery->commandType = CMD_SELECT;
resultQuery->canSetTag = true;
resultQuery->rtable = list_make1(emptyRangeTable);
#if PG_VERSION_NUM >= PG_VERSION_16
resultQuery->rteperminfos = NIL;
#endif
RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
rangeTableRef->rtindex = 1;

/* insert a FALSE qual to ensure 0 rows returned */
FromExpr *joinTree = makeNode(FromExpr);
joinTree->fromlist = list_make1(rangeTableRef);
joinTree->quals = makeBoolConst(false, false);
resultQuery->jointree = joinTree;
resultQuery->targetList = targetList;

return resultQuery;
}


/*
* BuildReadIntermediateResultsQuery is the common code for generating
* queries to read from result files. It is used by
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/recursive_planning.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
List *columnAliasList,
List *resultIdList,
bool useBinaryCopyFormat);
extern Query * BuildEmptyResultQuery(List *targetEntryList, char *resultId);
extern bool GeneratingSubplans(void);
extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
Expand Down
5 changes: 5 additions & 0 deletions src/include/distributed/repartition_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@ extern List * GenerateTaskListWithRedistributedResults(
bool useBinaryFormat);
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
extern bool IsRedistributablePlan(Plan *selectPlan);
extern bool HasMergeNotMatchedBySource(Query *query);
extern void AdjustTaskQueryForEmptySource(Oid targetRelationId,
Query *mergeQuery,
List *emptySourceTaskList,
char *resultIdPrefix);

#endif /* REPARTITION_EXECUTOR_H */
Loading
Loading