Skip to content

WIP Fix issue #7674 about UPDATE SET(..), with indirection #7675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/backend/distributed/deparser/citus_ruleutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1715,3 +1715,73 @@ RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier)
}
}
}


/*
* list_sort comparator to sort target list by paramid (in MULTIEXPR)
* Intended for indirection management: UPDATE SET () = (SELECT )
*/
int
target_list_cmp(const ListCell *a, const ListCell *b)
{
TargetEntry *tleA = lfirst(a);
TargetEntry *tleB = lfirst(b);

if (IsA(tleA->expr, Param) && IsA(tleB->expr, Param))
{
int la = ((Param *) tleA->expr)->paramid;
int lb = ((Param *) tleB->expr)->paramid;
return (la > lb) - (la < lb);
}
else if ((IsA(tleA->expr, Param) && IsA(tleB->expr, SubLink)) ||
(IsA(tleA->expr, SubLink) && IsA(tleB->expr, Param)) ||
(IsA(tleA->expr, SubLink) && IsA(tleB->expr, SubLink)))
{
return -1;
}
else
{
elog(ERROR, "unexpected nodes");
}
}


/*
* Recursively search an expression for a Param and return its paramid
* Intended for indirection management: UPDATE SET () = (SELECT )
* Does not cover all options but those supported by Citus.
*/
int
GetParamId(Node *expr)
{
int paramid = 0;

if (expr == NULL)
{
return paramid;
}

/* If it's a Param, return its attnum */
if (IsA(expr, Param))
{
Param *param = (Param *) expr;
paramid = param->paramid;
}
/* If it's a FuncExpr, search in arguments */
else if (IsA(expr, FuncExpr))
{
FuncExpr *func = (FuncExpr *) expr;
ListCell *lc;

foreach(lc, func->args)
{
paramid = GetParamId((Node *) lfirst(lc));
if (paramid != 0)
{
break; /* Stop at the first valid paramid */
}
}
}

return paramid;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have citus_ruleutils.c export one function that can be used by the supported PG ruleutils ? For example, a function: 

void
ensure_update_targetlist_in_param_order(List *targetList)
{
	bool need_to_sort_target_list = false;
	int previous_paramid = 0;
	ListCell *l;

	foreach(l, targetList)
	{
		TargetEntry *tle = (TargetEntry *) lfirst(l);

		if (!tle->resjunk)
		{
			int paramid = GetParamId((Node *) tle->expr);
			if (paramid < previous_paramid)
			{
				need_to_sort_target_list = true;
				break;
			}

			previous_paramid = paramid;
		}
	}

	if (need_to_sort_target_list)
		list_sort(targetList, target_list_cmp);
}

which is called by each supported get_update_query_targetlist_def() when the parse tree has MULTIEXPR assignments, maybe right after the sublinks have been collected. Basically, to keep the changes to each supported PG ruleutils to a minimum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm correct, it's adding a new traversal of the target list, I understand the motivation to reduce the diff, I was trying to not add too much extra processing. I check what is possible.

Copy link
Contributor

@colm-mchugh colm-mchugh Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there is a maintainability/processing tradeoff, but given that this is at plan-time and most of the time a sort will probably not be necessary the tradeoff for maintainability is worth making, unless there is a compelling counter-case.

40 changes: 40 additions & 0 deletions src/backend/distributed/deparser/ruleutils_15.c
Original file line number Diff line number Diff line change
Expand Up @@ -3509,6 +3509,9 @@ get_update_query_targetlist_def(Query *query, List *targetList,
SubLink *cur_ma_sublink;
List *ma_sublinks;

AttrNumber previous_attnum = InvalidAttrNumber;
int paramid_increment = 0;

/*
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
* into a list. We expect them to appear, in ID order, in resjunk tlist
Expand Down Expand Up @@ -3631,11 +3634,48 @@ get_update_query_targetlist_def(Query *query, List *targetList,
*/
if (cur_ma_sublink != NULL)
{
AttrNumber attnum = InvalidAttrNumber;
if (IsA(expr, Param))
{
Param *param = (Param *) expr;
attnum = param->paramid + paramid_increment;
}
else if (IsA(expr, FuncExpr))
{
FuncExpr *func = (FuncExpr *) expr;
ListCell *lc;

/* Iterate through the arguments of the FuncExpr */
foreach(lc, func->args)
{
Node *arg = (Node *) lfirst(lc);

/* Check if the argument is a PARAM node */
if (IsA(arg, Param))
{
Param *param = (Param *) arg;
attnum = param->paramid + paramid_increment;

break; /* Exit loop once we find the PARAM node */
}
}
}

if (previous_attnum >= attnum)
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg(
"cannot plan distributed UPDATE SET (..) = (SELECT ...) query when not sorted by physical order"),
errhint("Sort the columns on the left side by physical order.")));

previous_attnum = attnum;

if (--remaining_ma_columns > 0)
continue; /* not the last column of multiassignment */

appendStringInfoChar(buf, ')');
expr = (Node *) cur_ma_sublink;
cur_ma_sublink = NULL;
paramid_increment = previous_attnum;
}

appendStringInfoString(buf, " = ");
Expand Down
31 changes: 31 additions & 0 deletions src/backend/distributed/deparser/ruleutils_16.c
Original file line number Diff line number Diff line change
Expand Up @@ -3533,21 +3533,51 @@ get_update_query_targetlist_def(Query *query, List *targetList,
ma_sublinks = NIL;
if (query->hasSubLinks) /* else there can't be any */
{
bool saw_junk = false;
bool need_to_sort_target_list = false;
int previous_paramid = 0;

foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);

// elog(WARNING, "TOP node to string: %s", nodeToString(tle->expr));
// elog(WARNING, "TOP node type: %d", (int) nodeTag(tle->expr));

if (tle->resjunk && IsA(tle->expr, SubLink))
{
SubLink *sl = (SubLink *) tle->expr;
saw_junk = true;

if (sl->subLinkType == MULTIEXPR_SUBLINK)
{
ma_sublinks = lappend(ma_sublinks, sl);
Assert(sl->subLinkId == list_length(ma_sublinks));
}
}
else if (!tle->resjunk)
{
int paramid = 0;
Assert(!saw_junk);

paramid = GetParamId((Node *) tle->expr);
if (paramid < previous_paramid)
need_to_sort_target_list = true;

previous_paramid = paramid;
}
}

/*
* reorder the target list on left side of the update:
* SET () = (SELECT )
* reordering the SELECT side only does not work, consider a case like:
* SET (col_1, col3) = (SELECT 1, 3), (col_2) = (SELECT 2)
* Then default order will lead to:
* SET (col_1, col2) = (SELECT 1, 3), (col_3) = (SELECT 2)
*/
if (need_to_sort_target_list)
list_sort(targetList, target_list_cmp);
}
next_ma_cell = list_head(ma_sublinks);
cur_ma_sublink = NULL;
Expand Down Expand Up @@ -3658,6 +3688,7 @@ get_update_query_targetlist_def(Query *query, List *targetList,

get_rule_expr(expr, context, false);
}
elog(DEBUG4, "rewriten query: %s", buf->data);
}

/* ----------
Expand Down
40 changes: 40 additions & 0 deletions src/backend/distributed/deparser/ruleutils_17.c
Original file line number Diff line number Diff line change
Expand Up @@ -3542,6 +3542,9 @@ get_update_query_targetlist_def(Query *query, List *targetList,
SubLink *cur_ma_sublink;
List *ma_sublinks;

AttrNumber previous_attnum = InvalidAttrNumber;
int paramid_increment = 0;

/*
* Prepare to deal with MULTIEXPR assignments: collect the source SubLinks
* into a list. We expect them to appear, in ID order, in resjunk tlist
Expand Down Expand Up @@ -3664,11 +3667,48 @@ get_update_query_targetlist_def(Query *query, List *targetList,
*/
if (cur_ma_sublink != NULL)
{
AttrNumber attnum = InvalidAttrNumber;
if (IsA(expr, Param))
{
Param *param = (Param *) expr;
attnum = param->paramid + paramid_increment;
}
else if (IsA(expr, FuncExpr))
{
FuncExpr *func = (FuncExpr *) expr;
ListCell *lc;

/* Iterate through the arguments of the FuncExpr */
foreach(lc, func->args)
{
Node *arg = (Node *) lfirst(lc);

/* Check if the argument is a PARAM node */
if (IsA(arg, Param))
{
Param *param = (Param *) arg;
attnum = param->paramid + paramid_increment;

break; /* Exit loop once we find the PARAM node */
}
}
}

if (previous_attnum >= attnum)
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg(
"cannot plan distributed UPDATE SET (..) = (SELECT ...) query when not sorted by physical order"),
errhint("Sort the columns on the left side by physical order.")));

previous_attnum = attnum;
Copy link
Contributor

@colm-mchugh colm-mchugh Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently in the PR PG17 and PG15 ruleutils error out if the target columns are not in attribute order, is this intended to alert users to the problem ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently in the PR PG17 and PG15 ruleutils error out if the target columns are not in attribute order, is this intended to alert users to the problem ?

It was some tests in this direction: fixing the bug is great, informing users if we detect they have probably been exposed is important IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:

	DefineCustomStringVariable(
		"citus.log_pr_error_list",
		gettext_noop("Comma-separated list of PR numbers for ERROR trapping"),
		gettext_noop("set ERROR when a query enter a path changed by said pull request number"),
		&pr_error_list_raw,
		"",
		PGC_USERSET,
		GUC_STANDARD,
		NULL,
		PRErrorListGucAssignHook,
		NULL);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, perhaps this kind of mechanism could be put into a separate PR ? We appreciate the intent, but it may be easier to consider in a separate contribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, I will work on this new PR so it can be used for this UPDATE SET case.


if (--remaining_ma_columns > 0)
continue; /* not the last column of multiassignment */

appendStringInfoChar(buf, ')');
expr = (Node *) cur_ma_sublink;
cur_ma_sublink = NULL;
paramid_increment = previous_attnum;
}

appendStringInfoString(buf, " = ");
Expand Down
3 changes: 3 additions & 0 deletions src/include/distributed/citus_ruleutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,8 @@ extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);
extern List * getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype);
extern void AppendOptionListToString(StringInfo stringData, List *options);

extern int GetParamId(Node *expr);
extern int target_list_cmp(const ListCell *a, const ListCell *b);


#endif /* CITUS_RULEUTILS_H */
1 change: 1 addition & 0 deletions src/test/regress/expected/distributed_types.out
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ HINT: Use the column name to insert or update the composite type as a single va
UPDATE domain_indirection_test SET domain_array[0].if2 = (SELECT 5);
ERROR: inserting or modifying composite type fields is not supported
HINT: Use the column name to insert or update the composite type as a single value
HINT: Use the column name to insert or update the composite type as a single value
-- below are supported as we don't do any field indirection
INSERT INTO field_indirection_test_2 (ct2_col, int_col, ct1_col)
VALUES ('(1, "text1", 2)', 3, '(4, 5)'), ('(6, "text2", 7)', 8, '(9, 10)');
Expand Down
Loading
Loading