Skip to content

Commit

Permalink
Fix 0-Task Plans in Single-Shard Router When Updating a Local Table w…
Browse files Browse the repository at this point in the history
…ith Reference Table in Subquery (#7897)

This PR fixes an issue #7891 in the Citus planner where an `UPDATE` on a
local table with a subquery referencing a reference table could produce
a 0-task plan. Historically, the planner sometimes failed to detect that
both the target and referenced tables were effectively “local,”
assigning `INVALID_SHARD_ID `and yielding a no-op plan.

### Root Cause

- In the Citus router logic (`PlanRouterQuery`), we relied on `shardId`
to determine whether a query should be routed to a single shard.
- If `shardId == INVALID_SHARD_ID`, but we also had not marked the query
as a “local table modification,” the code path would produce zero tasks.
- Local + reference tables do not require multi-shard routing. Failing
to detect this “purely local” scenario caused Citus to incorrectly route
to zero tasks.

### Changes

**Enhanced Local Table Detection**

- Updated `IsLocalTableModification` and related checks to consider both
local and reference tables as “local” for planning, preventing the
0-task scenario.
- Expanded `ContainsOnlyLocalOrReferenceTables` to return true if there
are no fully distributed tables in the query.

**Added Regress Test**

- Introduced a new regress test (`issue_7891.sql`) which reproduces the
scenario.
- Verifies we get a valid single- or local-task plan rather than a
0-task plan.
  • Loading branch information
m3hm3t authored Feb 25, 2025
1 parent c1f5762 commit 2b96422
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 7 deletions.
1 change: 1 addition & 0 deletions citus-tools
Submodule citus-tools added at 3376bd
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,7 @@ IsLocalTableModification(Oid targetRelationId, Query *query, uint64 shardId,
return true;
}

if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties))
if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalOrReferenceTables(rteProperties))
{
return true;
}
Expand Down
10 changes: 6 additions & 4 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -2556,13 +2556,15 @@ AllShardsColocated(List *relationShardList)


/*
* ContainsOnlyLocalTables returns true if there is only
* local tables and not any distributed or reference table.
* ContainsOnlyLocalOrReferenceTables returns true if there are no distributed
* tables in the query. In other words, the query might reference only local
* tables and/or reference tables, but no fully distributed tables.
*/
bool
ContainsOnlyLocalTables(RTEListProperties *rteProperties)
ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties)
{
return !rteProperties->hasDistributedTable && !rteProperties->hasReferenceTable;
/* If hasDistributedTable is false, then all tables are either local or reference. */
return !rteProperties->hasDistributedTable;
}


Expand Down
2 changes: 1 addition & 1 deletion src/include/distributed/multi_router_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode);
extern Job * RouterJob(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError);
extern bool ContainsOnlyLocalTables(RTEListProperties *rteProperties);
extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties);
extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query);

#endif /* MULTI_ROUTER_PLANNER_H */
211 changes: 211 additions & 0 deletions src/test/regress/expected/issue_7891.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
-- This test validates that the query planner correctly handles nested subqueries involving both a
-- local table (t4_pg) and a reference table (t2_ref). The steps are as follows:
--
-- 1. A dedicated schema (issue_7891) is created, and three tables (t2_ref, t4_pg, t6_pg) are set up.
-- 2. The table t2_ref is designated as a reference table using the create_reference_table() function.
-- 3. Sample data is inserted into all tables.
-- 4. An UPDATE is executed on t6_pg. The update uses an EXISTS clause with a nested subquery:
-- - The outer subquery iterates over every row in t4_pg.
-- - The inner subquery selects c15 from t2_ref.
-- 5. The update should occur if the nested subquery returns any row, effectively updating t6_pg's vkey to 43.
-- 6. The final state of t6_pg is displayed to confirm that the update was applied.
--
-- Note: This test was originally designed to detect a planner bug where the nested structure might
-- lead to an incorrect plan (such as a 0-task plan), ensuring proper handling of reference and local tables.
-- https://github.com/citusdata/citus/issues/7891
CREATE SCHEMA issue_7891;
SET search_path TO issue_7891;
-- Create tables
CREATE TABLE t2_ref (
vkey INT,
pkey INT,
c15 TIMESTAMP
);
CREATE TABLE t2_ref2 (
vkey INT,
pkey INT,
c15 TIMESTAMP
);
CREATE TABLE t4_pg (
vkey INT,
pkey INT,
c22 NUMERIC,
c23 TEXT,
c24 TIMESTAMP
);
CREATE TABLE t6_pg (
vkey INT,
pkey INT,
c26 TEXT
);
-- Mark t2_ref and t2_ref2 as a reference table
SELECT create_reference_table('t2_ref');
create_reference_table
---------------------------------------------------------------------

(1 row)

SELECT create_reference_table('t2_ref2');
create_reference_table
---------------------------------------------------------------------

(1 row)

-- Insert sample data
INSERT INTO t6_pg (vkey, pkey, c26) VALUES
(2, 12000, 'initial'),
(3, 13000, 'will_be_deleted'),
(4, 14000, 'to_merge');
INSERT INTO t4_pg (vkey, pkey, c22, c23, c24)
VALUES (5, 15000, 0.0, ']]?', MAKE_TIMESTAMP(2071, 10, 26, 16, 20, 5));
INSERT INTO t2_ref (vkey, pkey, c15)
VALUES (14, 24000, NULL::timestamp);
-- Show initial data
SELECT 't6_pg before' AS label, * FROM t6_pg;
label | vkey | pkey | c26
---------------------------------------------------------------------
t6_pg before | 2 | 12000 | initial
t6_pg before | 3 | 13000 | will_be_deleted
t6_pg before | 4 | 14000 | to_merge
(3 rows)

SELECT 't4_pg data' AS label, * FROM t4_pg;
label | vkey | pkey | c22 | c23 | c24
---------------------------------------------------------------------
t4_pg data | 5 | 15000 | 0.0 | ]]? | Mon Oct 26 16:20:05 2071
(1 row)

SELECT 't2_ref data' AS label, * FROM t2_ref;
label | vkey | pkey | c15
---------------------------------------------------------------------
t2_ref data | 14 | 24000 |
(1 row)

--
-- The problematic query: update t6_pg referencing t4_pg and sub-subquery on t2_ref.
-- Historically might produce a 0-task plan if the planner incorrectly fails to
-- treat t4_pg/t2_ref as local/reference.
--
-- The outer subquery iterates over every row in table t4_pg.
UPDATE t6_pg
SET vkey = 43
WHERE EXISTS (
SELECT (SELECT c15 FROM t2_ref)
FROM t4_pg
);
SELECT 't6_pg after' AS label, * FROM t6_pg;
label | vkey | pkey | c26
---------------------------------------------------------------------
t6_pg after | 43 | 12000 | initial
t6_pg after | 43 | 13000 | will_be_deleted
t6_pg after | 43 | 14000 | to_merge
(3 rows)

--
-- DELETE with a similar nested subquery approach
-- Here, let's delete any rows for which t4_pg is non-empty (like a trivial check).
-- We'll specifically target the row with c26='will_be_deleted' to confirm it's removed.
--
DELETE FROM t6_pg
WHERE EXISTS (
SELECT (SELECT c15 FROM t2_ref)
FROM t4_pg
)
AND c26 = 'will_be_deleted';
SELECT 't6_pg after DELETE' AS label, * FROM t6_pg;
label | vkey | pkey | c26
---------------------------------------------------------------------
t6_pg after DELETE | 43 | 12000 | initial
t6_pg after DELETE | 43 | 14000 | to_merge
(2 rows)

--
-- We'll merge from t4_pg into t6_pg. The merge will update c26 for pkey=14000.
--
-- Anticipate an error indicating non-IMMUTABLE functions are not supported in MERGE statements on distributed tables.
-- Retain this comment to highlight the current limitation.
--
MERGE INTO t6_pg AS tgt
USING t4_pg AS src
ON (tgt.pkey = 14000)
WHEN MATCHED THEN
UPDATE SET c26 = 'merged_' || (SELECT pkey FROM t2_ref WHERE pkey=24000 LIMIT 1)
WHEN NOT MATCHED THEN
INSERT (vkey, pkey, c26)
VALUES (99, src.pkey, 'inserted_via_merge');
ERROR: non-IMMUTABLE functions are not yet supported in MERGE sql with distributed tables
MERGE INTO t2_ref AS tgt
USING t4_pg AS src
ON (tgt.pkey = src.pkey)
WHEN MATCHED THEN
UPDATE SET c15 = '2088-01-01 00:00:00'::timestamp
WHEN NOT MATCHED THEN
INSERT (vkey, pkey, c15)
VALUES (src.vkey, src.pkey, '2099-12-31 23:59:59'::timestamp);
ERROR: Reference table as target is not allowed in MERGE command
-- Show the final state of t2_ref:
SELECT 't2_ref after MERGE (using t4_pg)' AS label, * FROM t2_ref;
label | vkey | pkey | c15
---------------------------------------------------------------------
t2_ref after MERGE (using t4_pg) | 14 | 24000 |
(1 row)

MERGE INTO t2_ref2 AS tgt
USING t2_ref AS src
ON (tgt.pkey = src.pkey)
WHEN MATCHED THEN
UPDATE SET c15 = '2077-07-07 07:07:07'::timestamp
WHEN NOT MATCHED THEN
INSERT (vkey, pkey, c15)
VALUES (src.vkey, src.pkey, '2066-06-06 06:06:06'::timestamp);
ERROR: Reference table as target is not allowed in MERGE command
-- Show the final state of t2_ref2:
SELECT 't2_ref2 after MERGE (using t2_ref)' AS label, * FROM t2_ref2;
label | vkey | pkey | c15
---------------------------------------------------------------------
(0 rows)

MERGE INTO t6_pg AS tgt
USING t4_pg AS src
ON (tgt.pkey = src.pkey)
WHEN MATCHED THEN
UPDATE SET c26 = 'merged_value'
WHEN NOT MATCHED THEN
INSERT (vkey, pkey, c26)
VALUES (src.vkey, src.pkey, 'inserted_via_merge');
SELECT 't6_pg after MERGE' AS label, * FROM t6_pg;
label | vkey | pkey | c26
---------------------------------------------------------------------
t6_pg after MERGE | 43 | 12000 | initial
t6_pg after MERGE | 43 | 14000 | to_merge
t6_pg after MERGE | 5 | 15000 | inserted_via_merge
(3 rows)

--
-- Update the REFERENCE table itself and verify the change
-- This is to ensure that the reference table is correctly handled.
UPDATE t2_ref
SET c15 = '2099-01-01 00:00:00'::timestamp
WHERE pkey = 24000;
SELECT 't2_ref after self-update' AS label, * FROM t2_ref;
label | vkey | pkey | c15
---------------------------------------------------------------------
t2_ref after self-update | 14 | 24000 | Thu Jan 01 00:00:00 2099
(1 row)

UPDATE t2_ref
SET c15 = '2099-01-01 00:00:00'::timestamp
WHERE EXISTS (
SELECT 1
FROM t4_pg
);
ERROR: relation t4_pg is not distributed
SELECT 't2_ref after UPDATE' AS label, * FROM t2_ref;
label | vkey | pkey | c15
---------------------------------------------------------------------
t2_ref after UPDATE | 14 | 24000 | Thu Jan 01 00:00:00 2099
(1 row)

-- Cleanup
SET client_min_messages TO WARNING;
DROP SCHEMA issue_7891 CASCADE;
2 changes: 1 addition & 1 deletion src/test/regress/multi_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: binary_protocol
test: alter_table_set_access_method
test: alter_distributed_table
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes
Expand Down
Loading

0 comments on commit 2b96422

Please sign in to comment.