Skip to content

Add use_transaction parameter for postgres_query() and postgres_execute() #310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/postgres_scanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct PostgresBindData : public FunctionData {
bool can_use_main_thread = true;
bool read_only = true;
bool emit_ctid = false;
bool use_transaction = true;
idx_t max_threads = 1;

public:
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/postgres_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ class PostgresTransaction : public Transaction {
void Commit();
void Rollback();

PostgresConnection &GetConnectionWithoutTransaction();
PostgresConnection &GetConnection();

string GetDSN();
unique_ptr<PostgresResult> Query(const string &query);
unique_ptr<PostgresResult> QueryWithoutTransaction(const string &query);
vector<unique_ptr<PostgresResult>> ExecuteQueries(const string &queries);
static PostgresTransaction &Get(ClientContext &context, Catalog &catalog);

Expand Down
23 changes: 19 additions & 4 deletions src/postgres_execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
namespace duckdb {

struct PGExecuteBindData : public TableFunctionData {
explicit PGExecuteBindData(PostgresCatalog &pg_catalog, string query_p)
: pg_catalog(pg_catalog), query(std::move(query_p)) {
explicit PGExecuteBindData(PostgresCatalog &pg_catalog, string query_p, bool use_transaction)
: pg_catalog(pg_catalog), query(std::move(query_p)), use_transaction(use_transaction) {
}

bool finished = false;
PostgresCatalog &pg_catalog;
string query;
bool use_transaction = true;
};

static duckdb::unique_ptr<FunctionData> PGExecuteBind(ClientContext &context, TableFunctionBindInput &input,
Expand All @@ -36,7 +37,15 @@ static duckdb::unique_ptr<FunctionData> PGExecuteBind(ClientContext &context, Ta
throw BinderException("Attached database \"%s\" does not refer to a Postgres database", db_name);
}
auto &pg_catalog = catalog.Cast<PostgresCatalog>();
return make_uniq<PGExecuteBindData>(pg_catalog, input.inputs[1].GetValue<string>());

bool use_transaction = true;
for (auto &kv : input.named_parameters) {
if (kv.first == "use_transaction") {
use_transaction = BooleanValue::Get(kv.second);
}
}

return make_uniq<PGExecuteBindData>(pg_catalog, input.inputs[1].GetValue<string>(), use_transaction);
}

static void PGExecuteFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
Expand All @@ -45,13 +54,19 @@ static void PGExecuteFunction(ClientContext &context, TableFunctionInput &data_p
return;
}
auto &transaction = Transaction::Get(context, data.pg_catalog).Cast<PostgresTransaction>();
transaction.Query(data.query);
if (data.use_transaction) {
transaction.Query(data.query);
} else {
transaction.QueryWithoutTransaction(data.query);
}

data.finished = true;
}

PostgresExecuteFunction::PostgresExecuteFunction()
: TableFunction("postgres_execute", {LogicalType::VARCHAR, LogicalType::VARCHAR}, PGExecuteFunction,
PGExecuteBind) {
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
}

} // namespace duckdb
12 changes: 11 additions & 1 deletion src/postgres_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ static unique_ptr<FunctionData> PGQueryBind(ClientContext &context, TableFunctio
StringUtil::RTrim(sql);
}

auto &con = transaction.GetConnection();
bool use_transaction = true;
for (auto &kv : input.named_parameters) {
if (kv.first == "use_transaction") {
use_transaction = BooleanValue::Get(kv.second);
}
}
result->use_transaction = use_transaction;

auto &con = use_transaction ? transaction.GetConnection() : transaction.GetConnectionWithoutTransaction();

auto conn = con.GetConn();
// prepare execution of the query to figure out the result types and names
auto prepared = PQprepare(conn, "", sql.c_str(), 0, nullptr);
Expand Down Expand Up @@ -87,6 +96,7 @@ static unique_ptr<FunctionData> PGQueryBind(ClientContext &context, TableFunctio

PostgresQueryFunction::PostgresQueryFunction()
: TableFunction("postgres_query", {LogicalType::VARCHAR, LogicalType::VARCHAR}, nullptr, PGQueryBind) {
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
PostgresScanFunction scan_function;
init_global = scan_function.init_global;
init_local = scan_function.init_local;
Expand Down
20 changes: 11 additions & 9 deletions src/postgres_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,14 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
}
if (bind_data->table_name.empty()) {
D_ASSERT(!bind_data->sql.empty());
lstate.sql = StringUtil::Format(
R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT "binary");)",
col_names, bind_data->sql, filter);
lstate.sql =
StringUtil::Format(R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s) TO STDOUT (FORMAT "binary");)",
col_names, bind_data->sql, filter);

} else {
lstate.sql = StringUtil::Format(
R"(COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT "binary");)",
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter);
lstate.sql = StringUtil::Format(R"(COPY (SELECT %s FROM %s.%s %s) TO STDOUT (FORMAT "binary");)", col_names,
KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter);
}
lstate.exec = false;
lstate.done = false;
Expand Down Expand Up @@ -291,11 +290,14 @@ static unique_ptr<GlobalTableFunctionState> PostgresInitGlobalState(ClientContex
auto pg_catalog = bind_data.GetCatalog();
if (pg_catalog) {
auto &transaction = Transaction::Get(context, *pg_catalog).Cast<PostgresTransaction>();
auto &con = transaction.GetConnection();
auto &con =
bind_data.use_transaction ? transaction.GetConnection() : transaction.GetConnectionWithoutTransaction();
result->SetConnection(con.GetConnection());
} else {
auto con = PostgresConnection::Open(bind_data.dsn);
PostgresScanConnect(con, string());
if (bind_data.use_transaction) {
PostgresScanConnect(con, string());
}
result->SetConnection(std::move(con));
}
if (bind_data.requires_materialization) {
Expand Down
21 changes: 21 additions & 0 deletions src/storage/postgres_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ static string GetBeginTransactionQuery(AccessMode access_mode) {
return result;
}

PostgresConnection &PostgresTransaction::GetConnectionWithoutTransaction() {
if (transaction_state == PostgresTransactionState::TRANSACTION_STARTED) {
throw std::runtime_error("Execution without a Transaction is not possible if a Transaction already started");
}
if (access_mode == AccessMode::READ_ONLY) {
throw std::runtime_error("Execution without a Transaction is not possible in Read Only Mode");
}
return connection.GetConnection();
}

PostgresConnection &PostgresTransaction::GetConnection() {
auto &con = GetConnectionRaw();
if (transaction_state == PostgresTransactionState::TRANSACTION_NOT_YET_STARTED) {
Expand Down Expand Up @@ -68,6 +78,17 @@ unique_ptr<PostgresResult> PostgresTransaction::Query(const string &query) {
return con.Query(query);
}

unique_ptr<PostgresResult> PostgresTransaction::QueryWithoutTransaction(const string &query) {
auto &con = GetConnectionRaw();
if (transaction_state == PostgresTransactionState::TRANSACTION_STARTED) {
throw std::runtime_error("Execution without a Transaction is not possible if a Transaction already started");
}
if (access_mode == AccessMode::READ_ONLY) {
throw std::runtime_error("Execution without a Transaction is not possible in Read Only Mode");
}
return con.Query(query);
}

vector<unique_ptr<PostgresResult>> PostgresTransaction::ExecuteQueries(const string &queries) {
auto &con = GetConnectionRaw();
if (transaction_state == PostgresTransactionState::TRANSACTION_NOT_YET_STARTED) {
Expand Down
59 changes: 59 additions & 0 deletions test/sql/storage/postgres_execute_use_transaction.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# name: test/sql/storage/postgres_execute_use_transaction.test
# description: Test use_transaction flag in postgres_execute
# group: [storage]

require postgres_scanner

require-env POSTGRES_TEST_DATABASE_AVAILABLE

statement ok
PRAGMA enable_verification

statement ok
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES)

statement error
CALL postgres_execute('s', 'VACUUM')
----
Invalid Error: Failed to execute query "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
VACUUM": ERROR: VACUUM cannot run inside a transaction block

statement error
CALL postgres_execute('s', 'VACUUM', use_transaction=true)
----
Invalid Error: Failed to execute query "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
VACUUM": ERROR: VACUUM cannot run inside a transaction block

statement error
CALL postgres_execute('s', 'VACUUM', use_transaction=true)
----
Invalid Error: Failed to execute query "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
VACUUM": ERROR: VACUUM cannot run inside a transaction block

statement ok
CALL postgres_execute('s', 'VACUUM', use_transaction=false)

statement ok
BEGIN;

statement ok
CALL postgres_execute('s', 'SELECT 1')

statement error
CALL postgres_execute('s', 'VACUUM', use_transaction=false)
----
Invalid Error: Execution without a Transaction is not possible if a Transaction already started

statement ok
ROLLBACK

statement ok
CALL postgres_execute('s', 'VACUUM', use_transaction=false)

statement ok
ATTACH 'dbname=postgresscanner' AS s2 (TYPE POSTGRES, READ_ONLY)

statement error
CALL postgres_execute('s2', 'VACUUM', use_transaction=false)
----
Invalid Error: Execution without a Transaction is not possible in Read Only Mode
57 changes: 57 additions & 0 deletions test/sql/storage/postgres_query_use_transaction.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# name: test/sql/storage/postgres_query_use_transaction.test
# description: Test use_transaction flag in postgres_query
# group: [storage]

require postgres_scanner

require-env POSTGRES_TEST_DATABASE_AVAILABLE

statement ok
PRAGMA enable_verification

statement ok
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES)

query I
CALL postgres_query('s', 'SELECT 1')
----
1

query I
CALL postgres_query('s', 'SELECT 1', use_transaction=true)
----
1

query I
CALL postgres_query('s', 'SELECT 1', use_transaction=false)
----
1

statement ok
BEGIN;

query I
CALL postgres_query('s', 'SELECT 1')
----
1

statement error
CALL postgres_query('s', 'SELECT 1', use_transaction=false)
----
Invalid Error: Execution without a Transaction is not possible if a Transaction already started

statement ok
ROLLBACK

query I
CALL postgres_query('s', 'SELECT 1', use_transaction=false)
----
1

statement ok
ATTACH 'dbname=postgresscanner' AS s2 (TYPE POSTGRES, READ_ONLY)

statement error
CALL postgres_query('s2', 'SELECT 1', use_transaction=false)
----
Invalid Error: Execution without a Transaction is not possible in Read Only Mode