Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Feb 22, 2024
1 parent 62857d5 commit 015c598
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 15 deletions.
22 changes: 17 additions & 5 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ PreprocessAlterDatabaseStmt(Node *node, const char *queryString,
* NontransactionalNodeDDLTask to run the command on the workers outside
* the transaction block.
*/

return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands);
bool warnForPartialFailure = true;
return NontransactionalNodeDDLTaskList(NON_COORDINATOR_NODES, commands,
warnForPartialFailure);
}
else
{
Expand Down Expand Up @@ -522,6 +523,14 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
CreateDatabaseCommandOriginalDbName = stmt->dbname;
stmt->dbname = tempDatabaseName;

/*
* Delete cleanup records in the same transaction so that if the current
* transactions fails for some reason, then the cleanup records won't be
* deleted. In the happy path, we will delete the cleanup records without
* deferring them to the background worker.
*/
FinalizeOperationNeedingCleanupOnSuccess("create database");

return NIL;
}

Expand All @@ -537,7 +546,6 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
* cleanup records that we inserted in PreprocessCreateDatabaseStmt() and in case of a
* failure, we won't leak any databases called as the name that user intended to use for
* the database.
*
*/
List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
Expand Down Expand Up @@ -570,8 +578,10 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
* block, we need to use NontransactionalNodeDDLTaskList() to send the CREATE
* DATABASE statement to the workers.
*/
bool warnForPartialFailure = false;
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands,
warnForPartialFailure);

CreatedbStmt *stmt = castNode(CreatedbStmt, node);

Expand Down Expand Up @@ -670,8 +680,10 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
* use NontransactionalNodeDDLTaskList() to send the DROP DATABASE statement
* to the workers.
*/
bool warnForPartialFailure = true;
List *dropDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands);
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands,
warnForPartialFailure);
return dropDatabaseDDLJobList;
}

Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/commands/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd
ddlJob->startNewTransaction = createIndexStatement->concurrent;
ddlJob->metadataSyncCommand = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
ddlJob->warnForPartialFailure = true;

return ddlJob;
}
Expand Down Expand Up @@ -652,6 +653,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
"concurrently");
ddlJob->metadataSyncCommand = reindexCommand;
ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement);
ddlJob->warnForPartialFailure = true;

ddlJobs = list_make1(ddlJob);
}
Expand Down Expand Up @@ -780,6 +782,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
ddlJob->metadataSyncCommand = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJob->warnForPartialFailure = true;

ddlJobs = list_make1(ddlJob);
}
Expand Down
13 changes: 9 additions & 4 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
"invalid index, then retry the original command.")));
}
else
else if (ddlJob->warnForPartialFailure)
{
ereport(WARNING,
(errmsg(
Expand All @@ -1296,9 +1296,9 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
"state.\nIf the problematic command is a CREATE operation, "
"consider using the 'IF EXISTS' syntax to drop the object,"
"\nif applicable, and then re-attempt the original command.")));

PG_RE_THROW();
}

PG_RE_THROW();
}
PG_END_TRY();
}
Expand Down Expand Up @@ -1514,9 +1514,12 @@ DDLTaskList(Oid relationId, const char *commandString)
* NontransactionalNodeDDLTaskList builds a list of tasks to execute a DDL command on a
* given target set of nodes with cannotBeExecutedInTransaction is set to make sure
* that task list is executed outside a transaction block.
*
* Also sets warnForPartialFailure for the returned DDLJobs.
*/
List *
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands,
bool warnForPartialFailure)
{
List *ddlJobs = NodeDDLTaskList(targets, commands);
DDLJob *ddlJob = NULL;
Expand All @@ -1527,6 +1530,8 @@ NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands)
{
task->cannotBeExecutedInTransaction = true;
}

ddlJob->warnForPartialFailure = warnForPartialFailure;
}
return ddlJobs;
}
Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,11 @@ TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePo
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
/*
* Cannot use SendOptionalCommandListToWorkerOutsideTransactionWithConnection()
* because we don't want to open a transaction block on remote nodes as DROP
* DATABASE commands cannot be run inside a transaction block.
*/
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) !=
RESPONSE_OKAY)
{
Expand Down
12 changes: 11 additions & 1 deletion src/include/distributed/commands/utility_hook.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ typedef struct DDLJob
const char *metadataSyncCommand;

List *taskList; /* worker DDL tasks to execute */

/*
* Only applicable when any of the tasks cannot be executed in a
* transaction block.
*
* Controls whether to emit a warning within the utility hook in case of a
* failure.
*/
bool warnForPartialFailure;
} DDLJob;

extern ProcessUtility_hook_type PrevProcessUtility;
Expand All @@ -94,7 +103,8 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString,
extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString);
extern List * NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern List * NontransactionalNodeDDLTaskList(TargetWorkerSet targets, List *commands,
bool warnForPartialFailure);
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern bool AlterTableInProgress(void);
extern bool DropSchemaOrDBInProgress(void);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,17 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
--tests for special characters in database name
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
set citus.grep_remote_commands = '%DATABASE%';
SET citus.next_operation_id TO 2000;
create database "mydatabase#1'2";
NOTICE: issuing CREATE DATABASE citus_temp_database_2000_0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE DATABASE citus_temp_database_2000_0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_2000_0 RENAME TO "mydatabase#1'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER DATABASE citus_temp_database_2000_0 RENAME TO "mydatabase#1'2"
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.grep_remote_commands = '%DROP DATABASE%';
drop database if exists "mydatabase#1'2";
NOTICE: issuing DROP DATABASE IF EXISTS "mydatabase#1'2"
Expand Down
61 changes: 58 additions & 3 deletions src/test/regress/expected/failure_create_database.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ RETURNS TEXT AS $func$
SELECT array_agg(DISTINCT result ORDER BY result) AS temp_databases_on_nodes FROM run_command_on_all_nodes($$SELECT datname FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$) WHERE result != '';
$func$
LANGUAGE sql;
CREATE FUNCTION count_db_cleanup_records()
RETURNS TABLE(object_name TEXT, count INTEGER) AS $func$
SELECT object_name, COUNT(*) FROM pg_dist_cleanup WHERE object_name LIKE 'citus_temp_database_%' GROUP BY object_name;
$func$
LANGUAGE sql;
CREATE FUNCTION ensure_no_temp_databases_on_any_nodes()
RETURNS BOOLEAN AS $func$
SELECT bool_and(result::boolean) AS no_temp_databases_on_any_nodes FROM run_command_on_all_nodes($$SELECT COUNT(*)=0 FROM pg_database WHERE datname LIKE 'citus_temp_database_%'$$);
Expand Down Expand Up @@ -46,6 +51,11 @@ SELECT get_temp_databases_on_nodes();

(1 row)

SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)

CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
Expand All @@ -67,9 +77,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^CREATE DATABASE").cancel(' || pg_ba
(1 row)

CREATE DATABASE db1;
WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state.
If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object,
if applicable, and then re-attempt the original command.
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
Expand All @@ -83,6 +90,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4000_0}
(1 row)

SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4000_0 | 2
(1 row)

CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
Expand Down Expand Up @@ -117,6 +130,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4001_0}
(1 row)

SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4001_0 | 2
(1 row)

CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
Expand Down Expand Up @@ -151,6 +170,11 @@ SELECT get_temp_databases_on_nodes();

(1 row)

SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)

CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
Expand Down Expand Up @@ -186,6 +210,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4002_0}
(1 row)

SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4002_0 | 2
(1 row)

CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
Expand Down Expand Up @@ -237,6 +267,12 @@ SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, re
(2 rows)

DROP DATABASE db1;
-- after recovering the prepared transactions, cleanup records should also be removed
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)

SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_internal.acquire_citus_advisory_object_class_lock").kill()');
mitmproxy
---------------------------------------------------------------------
Expand All @@ -257,6 +293,11 @@ SELECT get_temp_databases_on_nodes();

(1 row)

SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)

CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
Expand Down Expand Up @@ -292,6 +333,12 @@ SELECT get_temp_databases_on_nodes();
{citus_temp_database_4004_0}
(1 row)

SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
citus_temp_database_4004_0 | 2
(1 row)

CALL citus_cleanup_orphaned_resources();
SELECT ensure_no_temp_databases_on_any_nodes();
ensure_no_temp_databases_on_any_nodes
Expand All @@ -306,5 +353,13 @@ SELECT * FROM public.check_database_on_all_nodes($$db1$$) ORDER BY node_type, re
worker node (remote) | {"database_properties": null, "pg_dist_object_record_for_db_exists": false, "stale_pg_dist_object_record_for_a_db_exists": false}
(2 rows)

CREATE DATABASE db1;
-- show that a successful database creation doesn't leave any pg_dist_cleanup records behind
SELECT * FROM count_db_cleanup_records();
object_name | count
---------------------------------------------------------------------
(0 rows)

DROP DATABASE db1;
DROP FUNCTION get_temp_databases_on_nodes();
DROP FUNCTION ensure_no_temp_databases_on_any_nodes();
2 changes: 1 addition & 1 deletion src/test/regress/sql/create_drop_database_propagation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ SELECT * FROM public.check_database_on_all_nodes('my_template_database') ORDER B
--tests for special characters in database name
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
set citus.grep_remote_commands = '%DATABASE%';
SET citus.next_operation_id TO 2000;

create database "mydatabase#1'2";
Expand Down
Loading

0 comments on commit 015c598

Please sign in to comment.