diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index ce1eb007318..7af37d950b2 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -219,6 +219,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition); char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix; bool hasReturning = distributedPlan->expectResults; + bool hasNotMatchedBySource = HasMergeNotMatchedBySource(mergeQuery); int partitionColumnIndex = distributedPlan->sourceResultRepartitionColumnIndex; /* @@ -233,7 +234,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) ereport(DEBUG1, (errmsg("Collect source query results on coordinator"))); - List *prunedTaskList = NIL; + List *prunedTaskList = NIL, *emptySourceTaskList = NIL; HTAB *shardStateHash = ExecuteMergeSourcePlanIntoColocatedIntermediateResults( targetRelationId, @@ -255,7 +256,8 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) * We cannot actually execute MERGE INTO ... tasks that read from * intermediate results that weren't created because no rows were * written to them. Prune those tasks out by only including tasks - * on shards with connections. + * on shards with connections; however, if the MERGE INTO includes + * a NOT MATCHED BY SOURCE clause we need to include the task. */ Task *task = NULL; foreach_declared_ptr(task, taskList) @@ -268,6 +270,19 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) { prunedTaskList = lappend(prunedTaskList, task); } + else if (hasNotMatchedBySource) + { + emptySourceTaskList = lappend(emptySourceTaskList, task); + } + } + + if (emptySourceTaskList != NIL) + { + ereport(DEBUG1, (errmsg("MERGE has NOT MATCHED BY SOURCE clause, " + "execute MERGE on all shards"))); + AdjustTaskQueryForEmptySource(targetRelationId, mergeQuery, emptySourceTaskList, + intermediateResultIdPrefix); + prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList); } if (prunedTaskList == NIL) diff --git a/src/backend/distributed/executor/repartition_executor.c b/src/backend/distributed/executor/repartition_executor.c index 6e4dd3df441..c9bdb144836 100644 --- a/src/backend/distributed/executor/repartition_executor.c +++ b/src/backend/distributed/executor/repartition_executor.c @@ -17,6 +17,7 @@ #include "nodes/parsenodes.h" #include "distributed/citus_custom_scan.h" +#include "distributed/deparse_shard_query.h" #include "distributed/intermediate_results.h" #include "distributed/listutils.h" #include "distributed/multi_physical_planner.h" @@ -101,6 +102,40 @@ IsRedistributablePlan(Plan *selectPlan) } +/* + * HasMergeNotMatchedBySource returns true if the MERGE query has a + * WHEN NOT MATCHED BY SOURCE clause. If it does, we need to execute + * the MERGE query on all shards of the target table, regardless of + * whether or not the source shard has any rows. + */ +bool +HasMergeNotMatchedBySource(Query *query) +{ + if (!IsMergeQuery(query)) + { + return false; + } + + bool haveNotMatchedBySource = false; + + #if PG_VERSION_NUM >= PG_VERSION_17 + ListCell *lc; + foreach(lc, query->mergeActionList) + { + MergeAction *action = lfirst_node(MergeAction, lc); + + if (action->matchKind == MERGE_WHEN_NOT_MATCHED_BY_SOURCE) + { + haveNotMatchedBySource = true; + break; + } + } + #endif + + return haveNotMatchedBySource; +} + + /* * GenerateTaskListWithColocatedIntermediateResults generates a list of tasks * for a query that inserts into a target relation and selects from a set of @@ -200,6 +235,61 @@ GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId, } +/* + * AdjustTaskQueryForEmptySource adjusts the query for tasks that read from an + * intermediate result to instead read from an empty relation. This ensures that + * the MERGE query is executed on all shards of the target table, because it has + * a NOT MATCHED BY SOURCE clause, which will be true for all target shards where + * the source shard has no rows. + */ +void +AdjustTaskQueryForEmptySource(Oid targetRelationId, + Query *mergeQuery, + List *tasks, + char *resultIdPrefix) +{ + Query *mergeQueryCopy = copyObject(mergeQuery); + RangeTblEntry *selectRte = ExtractSourceResultRangeTableEntry(mergeQueryCopy); + RangeTblEntry *mergeRte = ExtractResultRelationRTE(mergeQueryCopy); + List *targetList = selectRte->subquery->targetList; + ListCell *taskCell = NULL; + + foreach(taskCell, tasks) + { + Task *task = lfirst(taskCell); + uint64 shardId = task->anchorShardId; + StringInfo queryString = makeStringInfo(); + StringInfo resultId = makeStringInfo(); + + appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId); + + /* Generate a query for an empty relation */ + selectRte->subquery = BuildEmptyResultQuery(targetList, resultId->data); + + /* setting an alias simplifies deparsing of RETURNING */ + if (mergeRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + mergeRte->alias = alias; + } + + /* + * Generate a query string for the query that merges into a shard and reads + * from an empty relatoin. + * + * Since CTEs have already been converted to intermediate results, they need + * to removed from the query. Otherwise, worker queries include both + * intermediate results and CTEs in the query. + */ + mergeQueryCopy->cteList = NIL; + deparse_shard_query(mergeQueryCopy, targetRelationId, shardId, queryString); + ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); + + SetTaskQueryString(task, queryString->data); + } +} + + /* * GenerateTaskListWithRedistributedResults returns a task list to insert given * redistributedResults into the given target relation. @@ -223,6 +313,7 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition); RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery); Oid targetRelationId = targetRelation->relationId; + bool hasNotMatchedBySource = HasMergeNotMatchedBySource(modifyResultQuery); int shardCount = targetRelation->shardIntervalArrayLength; int shardOffset = 0; @@ -242,19 +333,33 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar StringInfo queryString = makeStringInfo(); /* skip empty tasks */ - if (resultIdList == NIL) + if (resultIdList == NIL && !hasNotMatchedBySource) { continue; } - /* sort result ids for consistent test output */ - List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp); + Query *fragmentSetQuery = NULL; - /* generate the query on the intermediate result */ - Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList, - NIL, - sortedResultIds, - useBinaryFormat); + if (resultIdList != NIL) + { + /* sort result ids for consistent test output */ + List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp); + + /* generate the query on the intermediate result */ + fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList, + NIL, + sortedResultIds, + useBinaryFormat); + } + else + { + /* No source data, but MERGE query has NOT MATCHED BY SOURCE */ + StringInfo emptyFragmentId = makeStringInfo(); + appendStringInfo(emptyFragmentId, "%s_" UINT64_FORMAT, "temp_empty_rel_", + shardId); + fragmentSetQuery = BuildEmptyResultQuery(selectTargetList, + emptyFragmentId->data); + } /* put the intermediate result query in the INSERT..SELECT */ selectRte->subquery = fragmentSetQuery; diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 9335b5ffc94..ab401417445 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -2291,6 +2291,128 @@ BuildReadIntermediateResultsArrayQuery(List *targetEntryList, } +/* + * For the given target list, build an empty relation with the same target list. + * For example, if the target list is (a, b, c), and resultId is "empty", then + * the result will be a Query object for the following SQL: + * SELECT a, b, c FROM (VALUES (NULL, NULL, NULL)) AS empty(a, b, c) WHERE false; + */ +Query * +BuildEmptyResultQuery(List *targetEntryList, char *resultId) +{ + List *targetList = NIL; + ListCell *targetEntryCell = NULL; + + List *colTypes = NIL; + List *colTypMods = NIL; + List *colCollations = NIL; + List *colNames = NIL; + + List *valueConsts = NIL; + List *valueTargetList = NIL; + List *valueColNames = NIL; + + int targetIndex = 1; + + /* build the target list and column definition list */ + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Node *targetExpr = (Node *) targetEntry->expr; + char *columnName = targetEntry->resname; + Oid columnType = exprType(targetExpr); + Oid columnTypMod = exprTypmod(targetExpr); + Oid columnCollation = exprCollation(targetExpr); + + if (targetEntry->resjunk) + { + continue; + } + + Var *tgtVar = makeVar(1, targetIndex, columnType, columnTypMod, columnCollation, + 0); + TargetEntry *tgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex, columnName, + false); + Const *valueConst = makeConst(columnType, columnTypMod, columnCollation, 0, + (Datum) 0, true, false); + + StringInfoData *columnString = makeStringInfo(); + appendStringInfo(columnString, "column%d", targetIndex); + + TargetEntry *valueTgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex, + columnString->data, false); + + valueConsts = lappend(valueConsts, valueConst); + valueTargetList = lappend(valueTargetList, valueTgtEntry); + valueColNames = lappend(valueColNames, makeString(columnString->data)); + + colNames = lappend(colNames, makeString(columnName)); + colTypes = lappend_oid(colTypes, columnType); + colTypMods = lappend_oid(colTypMods, columnTypMod); + colCollations = lappend_oid(colCollations, columnCollation); + + targetList = lappend(targetList, tgtEntry); + + targetIndex++; + } + + /* Build a RangeTable Entry for the Values relation */ + RangeTblEntry *valuesRangeTable = makeNode(RangeTblEntry); + valuesRangeTable->rtekind = RTE_VALUES; + valuesRangeTable->values_lists = list_make1(valueConsts); + valuesRangeTable->colcollations = colCollations; + valuesRangeTable->coltypes = colTypes; + valuesRangeTable->coltypmods = colTypMods; + valuesRangeTable->alias = NULL; + valuesRangeTable->eref = makeAlias("*VALUES*", valueColNames); + valuesRangeTable->inFromCl = true; + + RangeTblRef *valuesRTRef = makeNode(RangeTblRef); + valuesRTRef->rtindex = 1; + + FromExpr *valuesJoinTree = makeNode(FromExpr); + valuesJoinTree->fromlist = list_make1(valuesRTRef); + + /* build the VALUES query */ + Query *valuesQuery = makeNode(Query); + valuesQuery->canSetTag = true; + valuesQuery->commandType = CMD_SELECT; + valuesQuery->rtable = list_make1(valuesRangeTable); + #if PG_VERSION_NUM >= PG_VERSION_16 + valuesQuery->rteperminfos = NIL; + #endif + valuesQuery->jointree = valuesJoinTree; + valuesQuery->targetList = valueTargetList; + + /* build the empty query */ + RangeTblEntry *emptyRangeTable = makeNode(RangeTblEntry); + emptyRangeTable->rtekind = RTE_SUBQUERY; /* todo: is this correct? */ + emptyRangeTable->subquery = valuesQuery; + emptyRangeTable->alias = makeAlias(resultId, colNames); + emptyRangeTable->eref = emptyRangeTable->alias; + emptyRangeTable->inFromCl = true; + + RangeTblRef *rangeTableRef = makeNode(RangeTblRef); + rangeTableRef->rtindex = 1; + FromExpr *joinTree = makeNode(FromExpr); + joinTree->fromlist = list_make1(rangeTableRef); + joinTree->quals = makeBoolConst(false, false); + + /* build the SELECT query */ + Query *resultQuery = makeNode(Query); + resultQuery->commandType = CMD_SELECT; + resultQuery->canSetTag = true; + resultQuery->rtable = list_make1(emptyRangeTable); +#if PG_VERSION_NUM >= PG_VERSION_16 + resultQuery->rteperminfos = NIL; +#endif + resultQuery->jointree = joinTree; + resultQuery->targetList = targetList; + + return resultQuery; +} + + /* * BuildReadIntermediateResultsQuery is the common code for generating * queries to read from result files. It is used by diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index c37eba34334..b4aaa478533 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -40,6 +40,7 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList, List *columnAliasList, List *resultIdList, bool useBinaryCopyFormat); +extern Query * BuildEmptyResultQuery(List *targetEntryList, char *resultId); extern bool GeneratingSubplans(void); extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList); extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, diff --git a/src/include/distributed/repartition_executor.h b/src/include/distributed/repartition_executor.h index de4ad122a31..f636877e7eb 100644 --- a/src/include/distributed/repartition_executor.h +++ b/src/include/distributed/repartition_executor.h @@ -28,5 +28,10 @@ extern List * GenerateTaskListWithRedistributedResults( bool useBinaryFormat); extern bool IsSupportedRedistributionTarget(Oid targetRelationId); extern bool IsRedistributablePlan(Plan *selectPlan); +extern bool HasMergeNotMatchedBySource(Query *query); +extern void AdjustTaskQueryForEmptySource(Oid targetRelationId, + Query *mergeQuery, + List *emptySourceTaskList, + char *resultIdPrefix); #endif /* REPARTITION_EXECUTOR_H */ diff --git a/src/test/regress/expected/pg17.out b/src/test/regress/expected/pg17.out index c6deb41aaa8..586dc1f5ae9 100644 --- a/src/test/regress/expected/pg17.out +++ b/src/test/regress/expected/pg17.out @@ -2555,6 +2555,157 @@ MERGE INTO citus_reference_target t WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = val || ' not matched by source'; ERROR: Reference table as target is not allowed in MERGE command +-- Test Distributed-reference and distributed-local when the source table has fewer rows +-- than distributed target; this tests that MERGE with NOT MATCHED BY SOURCE needs to run +-- on all shards of the distributed target, regardless of whether or not the reshuffled +-- source table has data in the corresponding shard. +-- Re-populate the Postgres tables; +DELETE FROM postgres_source; +DELETE FROM postgres_target_1; +DELETE FROM postgres_target_2; +-- This time, the source table has fewer rows +INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,4) AS id; +-- try simple MERGE +MERGE INTO postgres_target_1 t + USING postgres_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_1 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge + 4 | 40 | inserted by merge + 5 | 500 | initial not matched by source + 7 | 700 | initial not matched by source + 9 | 900 | initial not matched by source + 11 | 1100 | initial not matched by source + 13 | 1300 | initial not matched by source + 15 | 1500 | initial not matched by source +(10 rows) + +-- same with a constant qual +MERGE INTO postgres_target_2 t + USING postgres_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_2 ORDER BY tid, val; + tid | balance | val +--------------------------------------------------------------------- + 1 | 110 | initial updated by merge + 2 | 20 | inserted by merge + 3 | 300 | initial not matched by source + 3 | 30 | inserted by merge + 4 | 40 | inserted by merge + 5 | 500 | initial not matched by source + 7 | 700 | initial not matched by source + 9 | 900 | initial not matched by source + 11 | 1100 | initial not matched by source + 13 | 1300 | initial not matched by source + 15 | 1500 | initial not matched by source +(11 rows) + +-- Re-populate the Citus tables; this time, the source table has fewer rows +DELETE FROM citus_local_source; +DELETE FROM citus_reference_source; +INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,4) AS id; +INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,4) AS id; +SET citus.shard_count to 32; +CREATE TABLE citus_distributed_target32 (tid integer, balance float, val text); +SELECT create_distributed_table('citus_distributed_target32', 'tid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO citus_distributed_target32 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +-- Distributed-Local +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- Distributed-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_1'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_2'); + compare_tables +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; -- Complex repartition query example with a mix of tables -- Example from blog post -- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge @@ -2670,8 +2821,38 @@ DEBUG: Using column - index:0 from the source list to redistribute DEBUG: Executing subplans of the source query and storing the results at the respective node(s) DEBUG: Redistributing source result rows across nodes DEBUG: Executing final MERGE on workers using intermediate results -DEBUG: -DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: +DEBUG: RESET client_min_messages; -- Expected output is: -- reading_id | sensor_id | reading_value | reading_timestamp diff --git a/src/test/regress/sql/pg17.sql b/src/test/regress/sql/pg17.sql index f55d50d172a..522a0a599e9 100644 --- a/src/test/regress/sql/pg17.sql +++ b/src/test/regress/sql/pg17.sql @@ -1336,6 +1336,114 @@ MERGE INTO citus_reference_target t WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = val || ' not matched by source'; +-- Test Distributed-reference and distributed-local when the source table has fewer rows +-- than distributed target; this tests that MERGE with NOT MATCHED BY SOURCE needs to run +-- on all shards of the distributed target, regardless of whether or not the reshuffled +-- source table has data in the corresponding shard. + +-- Re-populate the Postgres tables; +DELETE FROM postgres_source; +DELETE FROM postgres_target_1; +DELETE FROM postgres_target_2; + +-- This time, the source table has fewer rows +INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; +INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,4) AS id; + +-- try simple MERGE +MERGE INTO postgres_target_1 t + USING postgres_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_1 ORDER BY tid, val; + +-- same with a constant qual +MERGE INTO postgres_target_2 t + USING postgres_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT * FROM postgres_target_2 ORDER BY tid, val; + +-- Re-populate the Citus tables; this time, the source table has fewer rows +DELETE FROM citus_local_source; +DELETE FROM citus_reference_source; +INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,4) AS id; +INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,4) AS id; + +SET citus.shard_count to 32; +CREATE TABLE citus_distributed_target32 (tid integer, balance float, val text); +SELECT create_distributed_table('citus_distributed_target32', 'tid'); +INSERT INTO citus_distributed_target32 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id; + +-- Distributed-Local +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_local_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_1'); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_local_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED BY TARGET THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_2'); +ROLLBACK; + +-- Distributed-Reference +-- try simple MERGE +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_reference_source s + ON t.tid = s.sid + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_1'); +ROLLBACK; + +-- same with a constant qual +BEGIN; +MERGE INTO citus_distributed_target32 t + USING citus_reference_source s + ON t.tid = s.sid AND tid = 1 + WHEN MATCHED THEN + UPDATE SET balance = balance + delta, val = val || ' updated by merge' + WHEN NOT MATCHED THEN + INSERT VALUES (sid, delta, 'inserted by merge') + WHEN NOT MATCHED BY SOURCE THEN + UPDATE SET val = val || ' not matched by source'; +SELECT compare_tables('citus_distributed_target32', 'postgres_target_2'); +ROLLBACK; + -- Complex repartition query example with a mix of tables -- Example from blog post -- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge