Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance MERGE .. WHEN NOT MATCHED BY SOURCE for repartitioned source #7900

Merged
merged 1 commit into from
Feb 24, 2025

Conversation

colm-mchugh
Copy link
Contributor

@colm-mchugh colm-mchugh commented Feb 13, 2025

DESCRIPTION: Ensure that a MERGE command on a distributed table with a WHEN NOT MATCHED BY SOURCE clause runs against all shards of the distributed table.

The Postgres MERGE command updates a table using a table or a query as a data source. It provides three ways to match the target table with the source: WHEN MATCHED means that there is a row in both the target and source; WHEN NOT MATCHED means that there is a row in the source that has no match (is not present) in the target; and, as of PG17, WHEN NOT MATCHED BY SOURCE means that there is a row in the target that has no match in the source.

In Citus, when a MERGE command updates a distributed table using a local/reference table or a distributed query as source, that source is repartitioned, and for each repartitioned shard that has data (i.e. 1 or more rows) the MERGE is run against the corresponding distributed table shard. Suppose the distributed table has 32 shards, and the source repartitions into 4 shards that have data, with the remaining 28 shards being empty; then the MERGE command is performed on the 4 corresponding shards of the distributed table. However, the semantics of WHEN NOT MATCHED BY SOURCE are that the specified action must be performed on the target for each row in the target that is not in the source; so if the source is empty, all target rows should be updated. To see this, consider the following MERGE command:

MERGE INTO target AS t
USING source AS s ON t.id = s.id
WHEN NOT MATCHED BY SOURCE THEN UPDATE t SET t.col1 = 100

If the source has zero rows then every row in the target is updated s.t. its col1 value is 100. Currently in Citus a MERGE on a distributed table with a local/reference table or a distributed query as source ignores shards of the distributed table when the corresponding shard of the repartitioned source has zero rows. However, if the MERGE command specifies a WHEN NOT MATCHED BY SOURCE clause, then the MERGE should be performed on all shards of the distributed table, to ensure that the specified action is performed on the target for each row in the target that is not in the source. This PR enhances Citus MERGE execution so that when a repartitioned source shard has zero rows, and the MERGE command specifies a WHEN NOT MATCHED BY SOURCE clause, the MERGE is performed against the corresponding shard of the distributed table using an empty (zero row) relation as source, by generating a query of the form:

MERGE INTO target_shard_0002 AS t
USING (SELECT id FROM (VALUES (NULL) ) source_0002(id) WHERE FALSE) AS s ON t.id = s.id
WHEN NOT MATCHED BY SOURCE THEN UPDATE t set t.col1 = 100

This works because each row in the target shard will be updated, and WHEN MATCHED and WHEN NOT MATCHED, if specified, will be no-ops because the source has zero rows.

To implement this when the source is a local or reference table involves teaching function ExcuteSourceAtCoordAndRedistribution() in merge_executor.c to not prune tasks when the query has WHEN NOT MATCHED BY SOURCE but to instead replace the task's query to one that uses an empty relation as source. And when the source is a distributed query, function ExecuteMergeSourcePlanIntoColocatedIntermediateResults() (also in merge_executor.c) instead of skipping empty tasks now generates a query that uses an empty relation as source for the corresponding target shard of the distributed table, but again only when the query has WHEN NOT MATCHED BY SOURCE. A new function BuildEmptyResultQuery() is added to recursive_planning.c and it is used by both the aforementioned functions in merge_executor.c to build an empty relation to use as the source. It applies the appropriate type to each column of the empty relation so the join with the target makes sense to the query compiler.

Copy link

codecov bot commented Feb 13, 2025

Codecov Report

Attention: Patch coverage is 94.11765% with 7 lines in your changes missing coverage. Please review.

Project coverage is 89.44%. Comparing base (c55bc8c) to head (c7c75be).
Report is 21 commits behind head on release-13.0.

Additional details and impacted files
@@               Coverage Diff                @@
##           release-13.0    #7900      +/-   ##
================================================
- Coverage         89.48%   89.44%   -0.04%     
================================================
  Files               276      276              
  Lines             60063    60008      -55     
  Branches           7524     7517       -7     
================================================
- Hits              53747    53677      -70     
- Misses             4166     4185      +19     
+ Partials           2150     2146       -4     

@colm-mchugh colm-mchugh changed the title Address issue 7890 Enhance MERGE .. WHEN NOT MATCHED BY SOURCE for repartitioned source Feb 14, 2025
@colm-mchugh colm-mchugh self-assigned this Feb 14, 2025
@colm-mchugh colm-mchugh linked an issue Feb 14, 2025 that may be closed by this pull request
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072045 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072049_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072171 sr USING (SELECT temp_empty_rel__1072171.sensor_id, temp_empty_rel__1072171.average_reading, temp_empty_rel__1072171.last_reading_timestamp, temp_empty_rel__1072171.rid FROM (VALUES (NULL::numeric,NULL::numeric,NULL::timestamp without time zone,NULL::numeric)) temp_empty_rel__1072171(sensor_id, average_reading, last_reading_timestamp, rid) WHERE false) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072172 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072176_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072173 sr USING (SELECT temp_empty_rel__1072173.sensor_id, temp_empty_rel__1072173.average_reading, temp_empty_rel__1072173.last_reading_timestamp, temp_empty_rel__1072173.rid FROM (VALUES (NULL::numeric,NULL::numeric,NULL::timestamp without time zone,NULL::numeric)) temp_empty_rel__1072173(sensor_id, average_reading, last_reading_timestamp, rid) WHERE false) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
Copy link
Member

@naisila naisila Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have more tasks here because you set the shard count to 32 in the previous test?

EDIT: sorry, just saw that you have reset the shard count in the previous test

Copy link
Member

@naisila naisila Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why the number of pruned task list changed here, given that the result was actually correct.
Correct me if I'm wrong:

So, the reason why we have more tasks generated here, is that we have had empty source shards previously.
However, the reason why we got the correct output before, is that for those empty source shards previously, the target shard was also empty, and when the target is empty, "when not matched by source" has no effect.
So, technically, these extra tasks we get here are no-op, because they have an empty target, as well as an empty source

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not critical, because it's really an edge case, but is there a way that we can avoid empty source tasks, when the target is also empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, the reason why we got the correct output before, is that for those empty source shards previously, the target
shard was also empty, and when the target is empty, "when not matched by source" has no effect.
So, technically, these extra tasks we get here are no-op, because they have an empty target, as well as an empty source

Exactly!

[ ... ] is there a way that we can avoid empty source tasks, when the target is also empty?

I looked into this (and may have run it by you?) because the initial idea included not pruning the task when the target shard has data in addition to NOT MATCHED BY SOURCE being specified. However, determining if a shard has data deterministically (i.e. with 100% certainty) and cheaply (e.g. from metadata) seems challenging.. Function citus_shard_sizes() and view citus_shards are ways to get a shard's size, but did they not seem 100% reliable so I punted, on the assumption that it is an edge case. However, I'm open to checking further, particularly if there is any precedence, i.e. an existing check in the codebase that a shard has data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, makes sense. In that case, we can merge as is, just maybe note this down somewhere. Could also be part of a new infrastructure of Merge that Teja was talking about.

Copy link
Member

@naisila naisila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall look great, thanks Colm. I just have one question:
https://github.com/citusdata/citus/pull/7900/files#r1959563547

Copy link
Member

@naisila naisila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work 🎉
Would appreciate a review from @tejeswarm , I am reluctant to merge this without his approval as well, given his expertise in Merge.

@tejeswarm
Copy link
Contributor

Thanks @colm-mchugh for the detailed description, that helps. I agree with the fix that we were pruning the shards earlier and now with this new feature in 17, we need to run against them too. One quick question - what's the effect of this feature in a co-located routing query?

@tejeswarm
Copy link
Contributor

Thanks @colm-mchugh for the detailed description, that helps. I agree with the fix that we were pruning the shards earlier and now with this new feature in 17, we need to run against them too. One quick question - what's the effect of this feature in a co-located routing query?

Maybe we won't prune in that case?


/*
* Generate a query string for the query that merges into a shard and reads
* from an empty relatoin.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Typo

@colm-mchugh
Copy link
Contributor Author

colm-mchugh commented Feb 21, 2025

Thanks @colm-mchugh for the detailed description, that helps. I agree with the fix that we were pruning the shards earlier and now with this new feature in 17, we need to run against them too. One quick question - what's the effect of this feature in a co-located routing query?

Maybe we won't prune in that case?

Yes @tejeswarm , pruning does not happen for a co-located routing query - I think @naisila 's Distributed-Distributed queries in pg17.sql cover this case, but I also verified locally that pruning does not occur - note we run the merge against all shards:

SET citus.log_remote_commands to on;
MERGE INTO citus_distributed_target8 t
  USING citus_distributed_source s ON t.tid = s.sid AND t.tid=5
  WHEN NOT MATCHED BY SOURCE THEN
    UPDATE SET val = val || ' not matched by source';

psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000000 t USING issue_7890.citus_distributed_source_7000008 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9701 connectionId: 1
psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000001 t USING issue_7890.citus_distributed_source_7000009 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9702 connectionId: 2
psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000002 t USING issue_7890.citus_distributed_source_7000010 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9701 connectionId: 1
psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000003 t USING issue_7890.citus_distributed_source_7000011 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9702 connectionId: 2
psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000004 t USING issue_7890.citus_distributed_source_7000012 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9701 connectionId: 1
psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000005 t USING issue_7890.citus_distributed_source_7000013 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9702 connectionId: 2
psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000006 t USING issue_7890.citus_distributed_source_7000014 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9701 connectionId: 1
psql:/home/citus/sqrpts/issue_7890.colo.sql:44: NOTICE:  issuing MERGE INTO issue_7890.citus_distributed_target8_7000007 t USING issue_7890.citus_distributed_source_7000015 s ON ((t.tid OPERATOR(pg_catalog.=) s.sid) AND (t.tid OPERATOR(pg_catalog.=) 5)) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET val = (t.val OPERATOR(pg_catalog.||) ' not matched by source'::text)
DETAIL:  on server citus@localhost:9702 connectionId: 2
MERGE 12
Time: 2.922 ms

distributed target and repartitioned source.

Ensure that a MERGE command on a distributed table with a `WHEN NOT MATCHED BY SOURCE`
clause runs against all shards of the distributed table. PG17 introduced
this clause to the MERGE command, and the semantics when the source is
empty is that every row of the target is subject to the merge action.
Citus MERGE execution with a repartitioned source and distributed target
needs to run `WHEN NOT MATCHED BY SOURCE` against all shards of the
target, and not prune out the corresponding tasks.
@tejeswarm
Copy link
Contributor

Thanks @colm-mchugh One last question, how does this effect for local/reference table if any?

@@ -2291,6 +2291,129 @@ BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
}


/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting approach. Thinking out loud, it's not that we have to do in this PR, may be as a special case, intermediate results should generate empty files with the ID, not sure if that's better than this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that - intermediate results .. empty files with the ID - was the approach I initially tried ! However, it complicated the multi_copy logic significantly, specifically the CitusCopyDestReceiver, which streams tuples into the appropriate intermediate result file. As the problem is specific to merge execution, I thought it was preferable to to go with the empty relation approach. I also reckoned that it is possibly more efficient than creating an empty intermediate result file, but lmk what you think?

@colm-mchugh
Copy link
Contributor Author

Thanks @colm-mchugh One last question, how does this effect for local/reference table if any?

So a MERGE with reference table as target is not currently supported; there are existing tests for this in regress files merge, pg15' and pg17` . A MERGE with a local table as target and local or reference as source runs locally.

@colm-mchugh colm-mchugh merged commit c1f5762 into release-13.0 Feb 24, 2025
119 checks passed
@colm-mchugh colm-mchugh deleted the colm/issue_7890 branch February 24, 2025 09:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Shard count affects MERGE ... WHEN NOT MATCHED BY SOURCE
3 participants