Skip to content

Commit

Permalink
Merge branch 'release-13.0' into colm/issue_7782
Browse files Browse the repository at this point in the history
  • Loading branch information
ihalatci authored Feb 25, 2025
2 parents 030d05a + c1f5762 commit d7c122b
Show file tree
Hide file tree
Showing 7 changed files with 713 additions and 12 deletions.
19 changes: 17 additions & 2 deletions src/backend/distributed/executor/merge_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition);
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
bool hasReturning = distributedPlan->expectResults;
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(mergeQuery);
int partitionColumnIndex = distributedPlan->sourceResultRepartitionColumnIndex;

/*
Expand All @@ -233,7 +234,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)

ereport(DEBUG1, (errmsg("Collect source query results on coordinator")));

List *prunedTaskList = NIL;
List *prunedTaskList = NIL, *emptySourceTaskList = NIL;
HTAB *shardStateHash =
ExecuteMergeSourcePlanIntoColocatedIntermediateResults(
targetRelationId,
Expand All @@ -255,7 +256,8 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
* We cannot actually execute MERGE INTO ... tasks that read from
* intermediate results that weren't created because no rows were
* written to them. Prune those tasks out by only including tasks
* on shards with connections.
* on shards with connections; however, if the MERGE INTO includes
* a NOT MATCHED BY SOURCE clause we need to include the task.
*/
Task *task = NULL;
foreach_declared_ptr(task, taskList)
Expand All @@ -268,6 +270,19 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
{
prunedTaskList = lappend(prunedTaskList, task);
}
else if (hasNotMatchedBySource)
{
emptySourceTaskList = lappend(emptySourceTaskList, task);
}
}

if (emptySourceTaskList != NIL)
{
ereport(DEBUG1, (errmsg("MERGE has NOT MATCHED BY SOURCE clause, "
"execute MERGE on all shards")));
AdjustTaskQueryForEmptySource(targetRelationId, mergeQuery, emptySourceTaskList,
intermediateResultIdPrefix);
prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList);
}

if (prunedTaskList == NIL)
Expand Down
121 changes: 113 additions & 8 deletions src/backend/distributed/executor/repartition_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "nodes/parsenodes.h"

#include "distributed/citus_custom_scan.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/multi_physical_planner.h"
Expand Down Expand Up @@ -101,6 +102,40 @@ IsRedistributablePlan(Plan *selectPlan)
}


/*
* HasMergeNotMatchedBySource returns true if the MERGE query has a
* WHEN NOT MATCHED BY SOURCE clause. If it does, we need to execute
* the MERGE query on all shards of the target table, regardless of
* whether or not the source shard has any rows.
*/
bool
HasMergeNotMatchedBySource(Query *query)
{
if (!IsMergeQuery(query))
{
return false;
}

bool haveNotMatchedBySource = false;

#if PG_VERSION_NUM >= PG_VERSION_17
ListCell *lc;
foreach(lc, query->mergeActionList)
{
MergeAction *action = lfirst_node(MergeAction, lc);

if (action->matchKind == MERGE_WHEN_NOT_MATCHED_BY_SOURCE)
{
haveNotMatchedBySource = true;
break;
}
}
#endif

return haveNotMatchedBySource;
}


/*
* GenerateTaskListWithColocatedIntermediateResults generates a list of tasks
* for a query that inserts into a target relation and selects from a set of
Expand Down Expand Up @@ -200,6 +235,61 @@ GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
}


/*
* AdjustTaskQueryForEmptySource adjusts the query for tasks that read from an
* intermediate result to instead read from an empty relation. This ensures that
* the MERGE query is executed on all shards of the target table, because it has
* a NOT MATCHED BY SOURCE clause, which will be true for all target shards where
* the source shard has no rows.
*/
void
AdjustTaskQueryForEmptySource(Oid targetRelationId,
Query *mergeQuery,
List *tasks,
char *resultIdPrefix)
{
Query *mergeQueryCopy = copyObject(mergeQuery);
RangeTblEntry *selectRte = ExtractSourceResultRangeTableEntry(mergeQueryCopy);
RangeTblEntry *mergeRte = ExtractResultRelationRTE(mergeQueryCopy);
List *targetList = selectRte->subquery->targetList;
ListCell *taskCell = NULL;

foreach(taskCell, tasks)
{
Task *task = lfirst(taskCell);
uint64 shardId = task->anchorShardId;
StringInfo queryString = makeStringInfo();
StringInfo resultId = makeStringInfo();

appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);

/* Generate a query for an empty relation */
selectRte->subquery = BuildEmptyResultQuery(targetList, resultId->data);

/* setting an alias simplifies deparsing of RETURNING */
if (mergeRte->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
mergeRte->alias = alias;
}

/*
* Generate a query string for the query that merges into a shard and reads
* from an empty relation.
*
* Since CTEs have already been converted to intermediate results, they need
* to removed from the query. Otherwise, worker queries include both
* intermediate results and CTEs in the query.
*/
mergeQueryCopy->cteList = NIL;
deparse_shard_query(mergeQueryCopy, targetRelationId, shardId, queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));

SetTaskQueryString(task, queryString->data);
}
}


/*
* GenerateTaskListWithRedistributedResults returns a task list to insert given
* redistributedResults into the given target relation.
Expand All @@ -223,6 +313,7 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery);
Oid targetRelationId = targetRelation->relationId;
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(modifyResultQuery);

int shardCount = targetRelation->shardIntervalArrayLength;
int shardOffset = 0;
Expand All @@ -242,19 +333,33 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
StringInfo queryString = makeStringInfo();

/* skip empty tasks */
if (resultIdList == NIL)
if (resultIdList == NIL && !hasNotMatchedBySource)
{
continue;
}

/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
Query *fragmentSetQuery = NULL;

/* generate the query on the intermediate result */
Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
if (resultIdList != NIL)
{
/* sort result ids for consistent test output */
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);

/* generate the query on the intermediate result */
fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
NIL,
sortedResultIds,
useBinaryFormat);
}
else
{
/* No source data, but MERGE query has NOT MATCHED BY SOURCE */
StringInfo emptyFragmentId = makeStringInfo();
appendStringInfo(emptyFragmentId, "%s_" UINT64_FORMAT, "temp_empty_rel_",
shardId);
fragmentSetQuery = BuildEmptyResultQuery(selectTargetList,
emptyFragmentId->data);
}

/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = fragmentSetQuery;
Expand Down
123 changes: 123 additions & 0 deletions src/backend/distributed/planner/recursive_planning.c
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,129 @@ BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
}


/*
* For the given target list, build an empty relation with the same target list.
* For example, if the target list is (a, b, c), and resultId is "empty", then
* it returns a Query object for this SQL:
* SELECT a, b, c FROM (VALUES (NULL, NULL, NULL)) AS empty(a, b, c) WHERE false;
*/
Query *
BuildEmptyResultQuery(List *targetEntryList, char *resultId)
{
List *targetList = NIL;
ListCell *targetEntryCell = NULL;

List *colTypes = NIL;
List *colTypMods = NIL;
List *colCollations = NIL;
List *colNames = NIL;

List *valueConsts = NIL;
List *valueTargetList = NIL;
List *valueColNames = NIL;

int targetIndex = 1;

/* build the target list and column lists needed */
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Node *targetExpr = (Node *) targetEntry->expr;
char *columnName = targetEntry->resname;
Oid columnType = exprType(targetExpr);
Oid columnTypMod = exprTypmod(targetExpr);
Oid columnCollation = exprCollation(targetExpr);

if (targetEntry->resjunk)
{
continue;
}

Var *tgtVar = makeVar(1, targetIndex, columnType, columnTypMod, columnCollation,
0);
TargetEntry *tgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex, columnName,
false);
Const *valueConst = makeConst(columnType, columnTypMod, columnCollation, 0,
(Datum) 0, true, false);

StringInfoData *columnString = makeStringInfo();
appendStringInfo(columnString, "column%d", targetIndex);

TargetEntry *valueTgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex,
columnString->data, false);

valueConsts = lappend(valueConsts, valueConst);
valueTargetList = lappend(valueTargetList, valueTgtEntry);
valueColNames = lappend(valueColNames, makeString(columnString->data));

colNames = lappend(colNames, makeString(columnName));
colTypes = lappend_oid(colTypes, columnType);
colTypMods = lappend_oid(colTypMods, columnTypMod);
colCollations = lappend_oid(colCollations, columnCollation);

targetList = lappend(targetList, tgtEntry);

targetIndex++;
}

/* Build a RangeTable Entry for the VALUES relation */
RangeTblEntry *valuesRangeTable = makeNode(RangeTblEntry);
valuesRangeTable->rtekind = RTE_VALUES;
valuesRangeTable->values_lists = list_make1(valueConsts);
valuesRangeTable->colcollations = colCollations;
valuesRangeTable->coltypes = colTypes;
valuesRangeTable->coltypmods = colTypMods;
valuesRangeTable->alias = NULL;
valuesRangeTable->eref = makeAlias("*VALUES*", valueColNames);
valuesRangeTable->inFromCl = true;

RangeTblRef *valuesRTRef = makeNode(RangeTblRef);
valuesRTRef->rtindex = 1;

FromExpr *valuesJoinTree = makeNode(FromExpr);
valuesJoinTree->fromlist = list_make1(valuesRTRef);

/* build the VALUES query */
Query *valuesQuery = makeNode(Query);
valuesQuery->canSetTag = true;
valuesQuery->commandType = CMD_SELECT;
valuesQuery->rtable = list_make1(valuesRangeTable);
#if PG_VERSION_NUM >= PG_VERSION_16
valuesQuery->rteperminfos = NIL;
#endif
valuesQuery->jointree = valuesJoinTree;
valuesQuery->targetList = valueTargetList;

/* build the relation selecting from the VALUES */
RangeTblEntry *emptyRangeTable = makeNode(RangeTblEntry);
emptyRangeTable->rtekind = RTE_SUBQUERY;
emptyRangeTable->subquery = valuesQuery;
emptyRangeTable->alias = makeAlias(resultId, colNames);
emptyRangeTable->eref = emptyRangeTable->alias;
emptyRangeTable->inFromCl = true;

/* build the SELECT query */
Query *resultQuery = makeNode(Query);
resultQuery->commandType = CMD_SELECT;
resultQuery->canSetTag = true;
resultQuery->rtable = list_make1(emptyRangeTable);
#if PG_VERSION_NUM >= PG_VERSION_16
resultQuery->rteperminfos = NIL;
#endif
RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
rangeTableRef->rtindex = 1;

/* insert a FALSE qual to ensure 0 rows returned */
FromExpr *joinTree = makeNode(FromExpr);
joinTree->fromlist = list_make1(rangeTableRef);
joinTree->quals = makeBoolConst(false, false);
resultQuery->jointree = joinTree;
resultQuery->targetList = targetList;

return resultQuery;
}


/*
* BuildReadIntermediateResultsQuery is the common code for generating
* queries to read from result files. It is used by
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/recursive_planning.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
List *columnAliasList,
List *resultIdList,
bool useBinaryCopyFormat);
extern Query * BuildEmptyResultQuery(List *targetEntryList, char *resultId);
extern bool GeneratingSubplans(void);
extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
Expand Down
5 changes: 5 additions & 0 deletions src/include/distributed/repartition_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@ extern List * GenerateTaskListWithRedistributedResults(
bool useBinaryFormat);
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
extern bool IsRedistributablePlan(Plan *selectPlan);
extern bool HasMergeNotMatchedBySource(Query *query);
extern void AdjustTaskQueryForEmptySource(Oid targetRelationId,
Query *mergeQuery,
List *emptySourceTaskList,
char *resultIdPrefix);

#endif /* REPARTITION_EXECUTOR_H */
Loading

0 comments on commit d7c122b

Please sign in to comment.