-
Notifications
You must be signed in to change notification settings - Fork 695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add failure handling for CREATE DATABASE commands #7483
Conversation
2898212
to
35f64c7
Compare
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #7483 +/- ##
==========================================
- Coverage 89.61% 89.18% -0.44%
==========================================
Files 282 282
Lines 60376 60418 +42
Branches 7522 7522
==========================================
- Hits 54107 53882 -225
- Misses 4118 4313 +195
- Partials 2151 2223 +72 |
35f64c7
to
731f191
Compare
731f191
to
a6a684b
Compare
Added failure tests for the query packets that mitmproxy can capture, see the output of mitmproxy.dump below. However, I'm not sure why for some of the commands, such as (*): We capture a (32,coordinator,"[""Query(query=BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, XX, 'XXXX-XX-XX XX:XX:XX.XXXXXX-XX');)""]")
(32,worker,"['CommandComplete(command=BEGIN)', ""RowDescription(fieldcount=1,fields=['F(name=assign_distributed_transaction_id,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(32,coordinator,"['Query(query=SELECT citus_internal.acquire_citus_advisory_object_class_lock(26, NULL))']")
(32,worker,"[""RowDescription(fieldcount=1,fields=['F(name=acquire_citus_advisory_object_class_lock,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(0,coordinator,"[initial message]")
(0,worker,"['AuthenticationOk()', 'ParameterStatus(in_hot_standby=off)', 'ParameterStatus(integer_datetimes=on)', 'ParameterStatus(TimeZone=XXX)', 'ParameterStatus(IntervalStyle=postgres)', 'ParameterStatus(search_path=""$user"", public)', 'ParameterStatus(is_superuser=on)', 'ParameterStatus(application_name=citus_internal gpid=999999990000037730)', 'ParameterStatus(default_transaction_read_only=off)', 'ParameterStatus(scram_iterations=4096)', 'ParameterStatus(DateStyle=ISO, MDY)', 'ParameterStatus(standard_conforming_strings=on)', 'ParameterStatus(session_authorization=postgres)', 'ParameterStatus(client_encoding=UTF8)', 'ParameterStatus(server_version=XXX)', 'ParameterStatus(server_encoding=UTF8)', 'BackendKeyData(XXX)', 'ReadyForQuery(state=idle)']")
(0,coordinator,"[""Query(query=SET citus.enable_ddl_propagation TO 'off')""]")
(0,worker,"['CommandComplete(command=SET)', 'ReadyForQuery(state=idle)']")
(0,coordinator,"['Query(query=CREATE DATABASE citus_temp_database_5_0)']")
(0,worker,"['NoticeResponse(severity=NOTICE, message=Citus partially supports CREATE DATABASE for distributed databases, detail=Citus does not propagate CREATE DATABASE command to other nodes, hint=You can manually create a database and its extensions on other nodes.)']")
(0,worker,"['CommandComplete(command=CREATE DATABASE)', 'ReadyForQuery(state=idle)']")
(0,coordinator,"[""Query(query=SET citus.enable_ddl_propagation TO 'on')""]")
(0,worker,"['CommandComplete(command=SET)', 'ReadyForQuery(state=idle)']")
(32,coordinator,"[""Query(query=SET citus.enable_ddl_propagation TO 'off')""]")
(32,worker,"['CommandComplete(command=SET)', 'ReadyForQuery(state=in_transaction_block)']")
(32,coordinator,"['Query(query=ALTER DATABASE citus_temp_database_5_0 RENAME TO db2)']")
(32,worker,"['CommandComplete(command=ALTER DATABASE)', 'ReadyForQuery(state=in_transaction_block)']")
(32,coordinator,"[""Query(query=SET citus.enable_ddl_propagation TO 'on')""]")
(32,worker,"['CommandComplete(command=SET)', 'ReadyForQuery(state=in_transaction_block)']")
(32,coordinator,"[""Parse(name=,query=WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['db2']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;,parameters=[])"", 'Bind(destination_portal=,prepared_statement=,parameter_format_codes=[],parameter_values=[],result_column_format_count=1,result_column_format_codes=[0])', 'Describe(portal=<unnamed>)', 'Execute(<unnamed>, max_rows_to_return=0)', 'Sync()']")
(32,worker,"['ParseComplete()', 'BindComplete()', ""RowDescription(fieldcount=1,fields=['F(name=add_object_metadata,tableoid=0,colattrnum=0,typoid=2278,typlen=4,typmod=-1,format_code=0)'])"", 'DataRow(columncount=1,columns=[""C(length=0,value=b\\'\\')""])', 'CommandComplete(command=SELECT 1)', 'ReadyForQuery(state=in_transaction_block)']")
(32,coordinator,"[""Query(query=PREPARE TRANSACTION 'citus_0_37730_44_4')""]")
(32,worker,"['CommandComplete(command=PREPARE TRANSACTION)', 'ReadyForQuery(state=idle)']")
(32,coordinator,"[""Query(query=COMMIT PREPARED 'citus_0_37730_44_4')""]")
(32,worker,"['CommandComplete(command=COMMIT PREPARED)', 'ReadyForQuery(state=idle)']")
|
It is possible to capture Parse packages as well as Query packages. An example can be found at:
If you are curious, here is the documentation for PG protocol message formats: https://www.postgresql.org/docs/current/protocol-message-formats.html |
In preprocess phase, we save the original database name, replace dbname field of CreatedbStmt with a temporary name (to let Postgres to create the database with the temporary name locally) and then we insert a cleanup record for the temporary database name on all nodes **(\*\*)**. And in postprocess phase, we first rename the temporary database back to its original name for local node and then return a list of distributed DDL jobs i) to create the database with the temporary name and then ii) to rename it back to its original name on other nodes. That way, if CREATE DATABASE fails on any of the nodes, the temporary database will be cleaned up by the cleanup records that we inserted in preprocess phase and in case of a failure, we won't leak any databases called as the name that user intended to use for the database. Solves the problem documented in #7369 for CREATE DATABASE commands. **(\*\*):** To ensure that we insert cleanup records on all nodes, with this PR we also start requiring having the coordinator in the metadata because otherwise we would skip inserting a cleanup record for the coordinator.
In preprocess phase, we save the original database name, replace
dbname field of CreatedbStmt with a temporary name (to let Postgres
to create the database with the temporary name locally) and then
we insert a cleanup record for the temporary database name on all
nodes (**).
And in postprocess phase, we first rename the temporary database
back to its original name for local node and then return a list of
distributed DDL jobs i) to create the database with the temporary
name and then ii) to rename it back to its original name on other
nodes. That way, if CREATE DATABASE fails on any of the nodes, the
temporary database will be cleaned up by the cleanup records that
we inserted in preprocess phase and in case of a failure, we won't
leak any databases called as the name that user intended to use for
the database.
Solves the problem documented in #7369
for CREATE DATABASE commands.
(**): To ensure that we insert cleanup records on all nodes,
with this PR we also start requiring having the coordinator in the
metadata because otherwise we would skip inserting a cleanup record
for the coordinator.