Skip to content

Commit

Permalink
Enhance MERGE .. WHEN NOT MATCHED BY SOURCE for repartitioned source (c…
Browse files Browse the repository at this point in the history
…itusdata#7900)

DESCRIPTION: Ensure that a MERGE command on a distributed table with a
`WHEN NOT MATCHED BY SOURCE` clause runs against all shards of the
distributed table.

The Postgres MERGE command updates a table using a table or a query as a
data source. It provides three ways to match the target table with the
source: `WHEN MATCHED` means that there is a row in both the target and
source; `WHEN NOT MATCHED` means that there is a row in the source that
has no match (is not present) in the target; and, as of PG17, `WHEN NOT
MATCHED BY SOURCE` means that there is a row in the target that has no
match in the source.

In Citus, when a MERGE command updates a distributed table using a
local/reference table or a distributed query as source, that source is
repartitioned, and for each repartitioned shard that has data (i.e. 1 or
more rows) the MERGE is run against the corresponding distributed table
shard. Suppose the distributed table has 32 shards, and the source
repartitions into 4 shards that have data, with the remaining 28 shards
being empty; then the MERGE command is performed on the 4 corresponding
shards of the distributed table. However, the semantics of `WHEN NOT
MATCHED BY SOURCE` are that the specified action must be performed on
the target for each row in the target that is not in the source; so if
the source is empty, all target rows should be updated. To see this,
consider the following MERGE command:
```
MERGE INTO target AS t
USING source AS s ON t.id = s.id
WHEN NOT MATCHED BY SOURCE THEN UPDATE t SET t.col1 = 100
```
If the source has zero rows then every row in the target is updated s.t.
its col1 value is 100. Currently in Citus a MERGE on a distributed table
with a local/reference table or a distributed query as source ignores
shards of the distributed table when the corresponding shard of the
repartitioned source has zero rows. However, if the MERGE command
specifies a `WHEN NOT MATCHED BY SOURCE` clause, then the MERGE should
be performed on all shards of the distributed table, to ensure that the
specified action is performed on the target for each row in the target
that is not in the source. This PR enhances Citus MERGE execution so
that when a repartitioned source shard has zero rows, and the MERGE
command specifies a `WHEN NOT MATCHED BY SOURCE` clause, the MERGE is
performed against the corresponding shard of the distributed table using
an empty (zero row) relation as source, by generating a query of the
form:
```
MERGE INTO target_shard_0002 AS t
USING (SELECT id FROM (VALUES (NULL) ) source_0002(id) WHERE FALSE) AS s ON t.id = s.id
WHEN NOT MATCHED BY SOURCE THEN UPDATE t set t.col1 = 100
```
This works because each row in the target shard will be updated, and
`WHEN MATCHED` and `WHEN NOT MATCHED`, if specified, will be no-ops
because the source has zero rows.

To implement this when the source is a local or reference table involves
teaching function `ExcuteSourceAtCoordAndRedistribution()` in
`merge_executor.c` to not prune tasks when the query has `WHEN NOT
MATCHED BY SOURCE` but to instead replace the task's query to one that
uses an empty relation as source. And when the source is a distributed
query, function
`ExecuteMergeSourcePlanIntoColocatedIntermediateResults()` (also in
`merge_executor.c`) instead of skipping empty tasks now generates a
query that uses an empty relation as source for the corresponding target
shard of the distributed table, but again only when the query has `WHEN
NOT MATCHED BY SOURCE`. A new function `BuildEmptyResultQuery()` is
added to `recursive_planning.c` and it is used by both the
aforementioned functions in `merge_executor.c` to build an empty
relation to use as the source. It applies the appropriate type to each
column of the empty relation so the join with the target makes sense to
the query compiler.
  • Loading branch information
colm-mchugh authored Feb 24, 2025
1 parent 459c283 commit c1f5762
Show file tree
Hide file tree
Showing 7 changed files with 713 additions and 12 deletions.
19 changes: 17 additions & 2 deletions src/backend/distributed/executor/merge_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition);
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
bool hasReturning = distributedPlan->expectResults;
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(mergeQuery);
int partitionColumnIndex = distributedPlan->sourceResultRepartitionColumnIndex;

/*
Expand All @@ -233,7 +234,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)

ereport(DEBUG1, (errmsg("Collect source query results on coordinator")));

List *prunedTaskList = NIL;
List *prunedTaskList = NIL, *emptySourceTaskList = NIL;
HTAB *shardStateHash =
ExecuteMergeSourcePlanIntoColocatedIntermediateResults(
targetRelationId,
Expand All @@ -255,7 +256,8 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
* We cannot actually execute MERGE INTO ... tasks that read from
* intermediate results that weren't created because no rows were
* written to them. Prune those tasks out by only including tasks
* on shards with connections.
* on shards with connections; however, if the MERGE INTO includes
* a NOT MATCHED BY SOURCE clause we need to include the task.
*/
Task *task = NULL;
foreach_declared_ptr(task, taskList)
Expand All @@ -268,6 +270,19 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
{
prunedTaskList = lappend(prunedTaskList, task);
}
else if (hasNotMatchedBySource)
{
emptySourceTaskList = lappend(emptySourceTaskList, task);
}
}

if (emptySourceTaskList != NIL)
{
ereport(DEBUG1, (errmsg("MERGE has NOT MATCHED BY SOURCE clause, "
"execute MERGE on all shards")));
AdjustTaskQueryForEmptySource(targetRelationId, mergeQuery, emptySourceTaskList,
intermediateResultIdPrefix);
prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList);
}

if (prunedTaskList == NIL)
Expand Down
121 changes: 113 additions & 8 deletions src/backend/distributed/executor/repartition_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "nodes/parsenodes.h"

#include "distributed/citus_custom_scan.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/multi_physical_planner.h"
Expand Down Expand Up @@ -101,6 +102,40 @@ IsRedistributablePlan(Plan *selectPlan)
}


/*
* HasMergeNotMatchedBySource returns true if the MERGE query has a
* WHEN NOT MATCHED BY SOURCE clause. If it does, we need to execute
* the MERGE query on all shards of the target table, regardless of
* whether or not the source shard has any rows.
*/
bool
HasMergeNotMatchedBySource(Query *query)
{
if (!IsMergeQuery(query))
{
return false;
}

bool haveNotMatchedBySource = false;

#if PG_VERSION_NUM >= PG_VERSION_17
ListCell *lc;
foreach(lc, query->mergeActionList)
{
MergeAction *action = lfirst_node(MergeAction, lc);

if (action->matchKind == MERGE_WHEN_NOT_MATCHED_BY_SOURCE)
{
haveNotMatchedBySource = true;
break;
}
}
#endif

return haveNotMatchedBySource;
}


/*
* GenerateTaskListWithColocatedIntermediateResults generates a list of tasks
* for a query that inserts into a target relation and selects from a set of
Expand Down Expand Up @@ -200,6 +235,61 @@ GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
}


/*
* AdjustTaskQueryForEmptySource adjusts the query for tasks that read from an
* intermediate result to instead read from an empty relation. This ensures that
* the MERGE query is executed on all shards of the target table, because it has
* a NOT MATCHED BY SOURCE clause, which will be true for all target shards where
* the source shard has no rows.
*/
void
AdjustTaskQueryForEmptySource(Oid targetRelationId,
Query *mergeQuery,
List *tasks,
char *resultIdPrefix)
{
Query *mergeQueryCopy = copyObject(mergeQuery);
RangeTblEntry *selectRte = ExtractSourceResultRangeTableEntry(mergeQueryCopy);
RangeTblEntry *mergeRte = ExtractResultRelationRTE(mergeQueryCopy);
List *targetList = selectRte->subquery->targetList;
ListCell *taskCell = NULL;

foreach(taskCell, tasks)
{
Task *task = lfirst(taskCell);
uint64 shardId = task->anchorShardId;
StringInfo queryString = makeStringInfo();
StringInfo resultId = makeStringInfo();

appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);

/* Generate a query for an empty relation */
selectRte->subquery = BuildEmptyResultQuery(targetList, resultId->data);

/* setting an alias simplifies deparsing of RETURNING */
if (mergeRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
mergeRte->alias = alias;
}

/*
* Generate a query string for the query that merges into a shard and reads
* from an empty relation.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
mergeQueryCopy->cteList = NIL;
deparse_shard_query(mergeQueryCopy, targetRelationId, shardId, queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));

SetTaskQueryString(task, queryString->data);
}
}


/*
* GenerateTaskListWithRedistributedResults returns a task list to insert given
* redistributedResults into the given target relation.
Expand All @@ -223,6 +313,7 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery);
Oid targetRelationId = targetRelation->relationId;
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(modifyResultQuery);

int shardCount = targetRelation->shardIntervalArrayLength;
int shardOffset = 0;
Expand All @@ -242,19 +333,33 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
StringInfo queryString = makeStringInfo();

/* skip empty tasks */
if (resultIdList == NIL)
if (resultIdList == NIL && !hasNotMatchedBySource)
{
continue;
}

/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
Query *fragmentSetQuery = NULL;

/* generate the query on the intermediate result */
Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
if (resultIdList != NIL)
{
/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);

/* generate the query on the intermediate result */
fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
}
else
{
/* No source data, but MERGE query has NOT MATCHED BY SOURCE */
StringInfo emptyFragmentId = makeStringInfo();
appendStringInfo(emptyFragmentId, "%s_" UINT64_FORMAT, "temp_empty_rel_",
shardId);
fragmentSetQuery = BuildEmptyResultQuery(selectTargetList,
emptyFragmentId->data);
}

/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = fragmentSetQuery;
Expand Down
123 changes: 123 additions & 0 deletions src/backend/distributed/planner/recursive_planning.c
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,129 @@ BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
}


/*
* For the given target list, build an empty relation with the same target list.
* For example, if the target list is (a, b, c), and resultId is "empty", then
* it returns a Query object for this SQL:
* SELECT a, b, c FROM (VALUES (NULL, NULL, NULL)) AS empty(a, b, c) WHERE false;
*/
Query *
BuildEmptyResultQuery(List *targetEntryList, char *resultId)
{
List *targetList = NIL;
ListCell *targetEntryCell = NULL;

List *colTypes = NIL;
List *colTypMods = NIL;
List *colCollations = NIL;
List *colNames = NIL;

List *valueConsts = NIL;
List *valueTargetList = NIL;
List *valueColNames = NIL;

int targetIndex = 1;

/* build the target list and column lists needed */
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Node *targetExpr = (Node *) targetEntry->expr;
char *columnName = targetEntry->resname;
Oid columnType = exprType(targetExpr);
Oid columnTypMod = exprTypmod(targetExpr);
Oid columnCollation = exprCollation(targetExpr);

if (targetEntry->resjunk)
{
continue;
}

Var *tgtVar = makeVar(1, targetIndex, columnType, columnTypMod, columnCollation,
0);
TargetEntry *tgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex, columnName,
false);
Const *valueConst = makeConst(columnType, columnTypMod, columnCollation, 0,
(Datum) 0, true, false);

StringInfoData *columnString = makeStringInfo();
appendStringInfo(columnString, "column%d", targetIndex);

TargetEntry *valueTgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex,
columnString->data, false);

valueConsts = lappend(valueConsts, valueConst);
valueTargetList = lappend(valueTargetList, valueTgtEntry);
valueColNames = lappend(valueColNames, makeString(columnString->data));

colNames = lappend(colNames, makeString(columnName));
colTypes = lappend_oid(colTypes, columnType);
colTypMods = lappend_oid(colTypMods, columnTypMod);
colCollations = lappend_oid(colCollations, columnCollation);

targetList = lappend(targetList, tgtEntry);

targetIndex++;
}

/* Build a RangeTable Entry for the VALUES relation */
RangeTblEntry *valuesRangeTable = makeNode(RangeTblEntry);
valuesRangeTable->rtekind = RTE_VALUES;
valuesRangeTable->values_lists = list_make1(valueConsts);
valuesRangeTable->colcollations = colCollations;
valuesRangeTable->coltypes = colTypes;
valuesRangeTable->coltypmods = colTypMods;
valuesRangeTable->alias = NULL;
valuesRangeTable->eref = makeAlias("*VALUES*", valueColNames);
valuesRangeTable->inFromCl = true;

RangeTblRef *valuesRTRef = makeNode(RangeTblRef);
valuesRTRef->rtindex = 1;

FromExpr *valuesJoinTree = makeNode(FromExpr);
valuesJoinTree->fromlist = list_make1(valuesRTRef);

/* build the VALUES query */
Query *valuesQuery = makeNode(Query);
valuesQuery->canSetTag = true;
valuesQuery->commandType = CMD_SELECT;
valuesQuery->rtable = list_make1(valuesRangeTable);
#if PG_VERSION_NUM >= PG_VERSION_16
valuesQuery->rteperminfos = NIL;
#endif
valuesQuery->jointree = valuesJoinTree;
valuesQuery->targetList = valueTargetList;

/* build the relation selecting from the VALUES */
RangeTblEntry *emptyRangeTable = makeNode(RangeTblEntry);
emptyRangeTable->rtekind = RTE_SUBQUERY;
emptyRangeTable->subquery = valuesQuery;
emptyRangeTable->alias = makeAlias(resultId, colNames);
emptyRangeTable->eref = emptyRangeTable->alias;
emptyRangeTable->inFromCl = true;

/* build the SELECT query */
Query *resultQuery = makeNode(Query);
resultQuery->commandType = CMD_SELECT;
resultQuery->canSetTag = true;
resultQuery->rtable = list_make1(emptyRangeTable);
#if PG_VERSION_NUM >= PG_VERSION_16
resultQuery->rteperminfos = NIL;
#endif
RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
rangeTableRef->rtindex = 1;

/* insert a FALSE qual to ensure 0 rows returned */
FromExpr *joinTree = makeNode(FromExpr);
joinTree->fromlist = list_make1(rangeTableRef);
joinTree->quals = makeBoolConst(false, false);
resultQuery->jointree = joinTree;
resultQuery->targetList = targetList;

return resultQuery;
}


/*
* BuildReadIntermediateResultsQuery is the common code for generating
* queries to read from result files. It is used by
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/recursive_planning.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
List *columnAliasList,
List *resultIdList,
bool useBinaryCopyFormat);
extern Query * BuildEmptyResultQuery(List *targetEntryList, char *resultId);
extern bool GeneratingSubplans(void);
extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
Expand Down
5 changes: 5 additions & 0 deletions src/include/distributed/repartition_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@ extern List * GenerateTaskListWithRedistributedResults(
bool useBinaryFormat);
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
extern bool IsRedistributablePlan(Plan *selectPlan);
extern bool HasMergeNotMatchedBySource(Query *query);
extern void AdjustTaskQueryForEmptySource(Oid targetRelationId,
Query *mergeQuery,
List *emptySourceTaskList,
char *resultIdPrefix);

#endif /* REPARTITION_EXECUTOR_H */
Loading

0 comments on commit c1f5762

Please sign in to comment.