Skip to content

Commit 29d6dd8

Browse files
committed
Make isolation level configurable
1 parent 959a869 commit 29d6dd8

21 files changed

+187
-101
lines changed

src/include/postgres_utils.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ struct PostgresCopyState {
5454
void Initialize(ClientContext &context);
5555
};
5656

57+
enum class PostgresIsolationLevel { READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE };
58+
5759
class PostgresUtils {
5860
public:
5961
static PGconn *PGConnect(const string &dsn);

src/include/storage/postgres_catalog.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ class PostgresSchemaEntry;
2121
class PostgresCatalog : public Catalog {
2222
public:
2323
explicit PostgresCatalog(AttachedDatabase &db_p, string connection_string, string attach_path,
24-
AccessMode access_mode, string schema_to_load);
24+
AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level);
2525
~PostgresCatalog();
2626

2727
string connection_string;
2828
string attach_path;
2929
AccessMode access_mode;
30+
PostgresIsolationLevel isolation_level;
3031

3132
public:
3233
void Initialize(bool load_builtin) override;

src/include/storage/postgres_transaction.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@ class PostgresTransaction : public Transaction {
4343
PostgresPoolConnection connection;
4444
PostgresTransactionState transaction_state;
4545
AccessMode access_mode;
46+
PostgresIsolationLevel isolation_level;
4647
string temporary_schema;
4748

4849
private:
4950
//! Retrieves the connection **without** starting a transaction if none is active
5051
PostgresConnection &GetConnectionRaw();
52+
53+
string GetBeginTransactionQuery();
5154
};
5255

5356
} // namespace duckdb

src/postgres_binary_copy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,4 @@ void PostgresBinaryCopyFunction::PostgresBinaryWriteFinalize(ClientContext &cont
101101
gstate.Flush();
102102
}
103103

104-
} // namespace duckdb
104+
} // namespace duckdb

src/postgres_execute.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ static void PGExecuteFunction(ClientContext &context, TableFunctionInput &data_p
6666
PostgresExecuteFunction::PostgresExecuteFunction()
6767
: TableFunction("postgres_execute", {LogicalType::VARCHAR, LogicalType::VARCHAR}, PGExecuteFunction,
6868
PGExecuteBind) {
69-
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
69+
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
7070
}
7171

7272
} // namespace duckdb

src/postgres_extension.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,8 @@ static void LoadInternal(DatabaseInstance &db) {
173173
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
174174
config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN,
175175
Value::BOOLEAN(true), PostgresConnectionPool::PostgresSetConnectionCache);
176-
config.AddExtensionOption("pg_experimental_filter_pushdown",
177-
"Whether or not to use filter pushdown", LogicalType::BOOLEAN,
178-
Value::BOOLEAN(true));
176+
config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown",
177+
LogicalType::BOOLEAN, Value::BOOLEAN(true));
179178
config.AddExtensionOption("pg_null_byte_replacement",
180179
"When writing NULL bytes to Postgres, replace them with the given character",
181180
LogicalType::VARCHAR, Value(), SetPostgresNullByteReplacement);

src/postgres_filter_pushdown.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ string TransformBlob(const string &val) {
4545
char const HEX_DIGITS[] = "0123456789ABCDEF";
4646

4747
string result = "'\\x";
48-
for(idx_t i = 0; i < val.size(); i++) {
48+
for (idx_t i = 0; i < val.size(); i++) {
4949
uint8_t byte_val = static_cast<uint8_t>(val[i]);
5050
result += HEX_DIGITS[(byte_val >> 4) & 0xf];
5151
result += HEX_DIGITS[byte_val & 0xf];
@@ -96,7 +96,7 @@ string PostgresFilterPushdown::TransformFilter(string &column_name, TableFilter
9696
case TableFilterType::IN_FILTER: {
9797
auto &in_filter = filter.Cast<InFilter>();
9898
string in_list;
99-
for(auto &val : in_filter.values) {
99+
for (auto &val : in_filter.values) {
100100
if (!in_list.empty()) {
101101
in_list += ", ";
102102
}

src/postgres_scanner.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -251,15 +251,15 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
251251
}
252252
if (bind_data->table_name.empty()) {
253253
D_ASSERT(!bind_data->sql.empty());
254-
lstate.sql = StringUtil::Format(
255-
R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)",
256-
col_names, bind_data->sql, filter, bind_data->limit);
254+
lstate.sql =
255+
StringUtil::Format(R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)",
256+
col_names, bind_data->sql, filter, bind_data->limit);
257257

258258
} else {
259-
lstate.sql = StringUtil::Format(
260-
R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)",
261-
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
262-
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
259+
lstate.sql =
260+
StringUtil::Format(R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)", col_names,
261+
KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
262+
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
263263
}
264264
lstate.exec = false;
265265
lstate.done = false;

src/postgres_storage.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl
1818

1919
string secret_name;
2020
string schema_to_load;
21+
PostgresIsolationLevel isolation_level = PostgresIsolationLevel::REPEATABLE_READ;
2122
for (auto &entry : info.options) {
2223
auto lower_name = StringUtil::Lower(entry.first);
2324
if (lower_name == "type" || lower_name == "read_only") {
@@ -26,12 +27,27 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl
2627
secret_name = entry.second.ToString();
2728
} else if (lower_name == "schema") {
2829
schema_to_load = entry.second.ToString();
30+
} else if (lower_name == "isolation_level") {
31+
auto param = entry.second.ToString();
32+
auto lparam = StringUtil::Lower(param);
33+
if (lparam == "read committed") {
34+
isolation_level = PostgresIsolationLevel::READ_COMMITTED;
35+
} else if (lparam == "repeatable read") {
36+
isolation_level = PostgresIsolationLevel::REPEATABLE_READ;
37+
} else if (lparam == "serializable") {
38+
isolation_level = PostgresIsolationLevel::SERIALIZABLE;
39+
} else {
40+
throw InvalidInputException("Invalid value \"%s\" for isolation_level, expected READ COMMITTED, "
41+
"REPEATABLE READ or SERIALIZABLE",
42+
param);
43+
}
2944
} else {
3045
throw BinderException("Unrecognized option for Postgres attach: %s", entry.first);
3146
}
3247
}
3348
auto connection_string = PostgresCatalog::GetConnectionString(context, attach_path, secret_name);
34-
return make_uniq<PostgresCatalog>(db, std::move(connection_string), std::move(attach_path), access_mode, std::move(schema_to_load));
49+
return make_uniq<PostgresCatalog>(db, std::move(connection_string), std::move(attach_path), access_mode,
50+
std::move(schema_to_load), isolation_level);
3551
}
3652

3753
static unique_ptr<TransactionManager> PostgresCreateTransactionManager(StorageExtensionInfo *storage_info,

src/storage/postgres_catalog.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010

1111
namespace duckdb {
1212

13-
PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_string_p, string attach_path_p, AccessMode access_mode,
14-
string schema_to_load)
15-
: Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)), access_mode(access_mode), schemas(*this, schema_to_load), connection_pool(*this),
16-
default_schema(schema_to_load) {
13+
PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_string_p, string attach_path_p,
14+
AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level)
15+
: Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)),
16+
access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load),
17+
connection_pool(*this), default_schema(schema_to_load) {
1718
if (default_schema.empty()) {
1819
default_schema = "public";
1920
}
@@ -100,7 +101,6 @@ string PostgresCatalog::GetConnectionString(ClientContext &context, const string
100101
return connection_string;
101102
}
102103

103-
104104
PostgresCatalog::~PostgresCatalog() = default;
105105

106106
void PostgresCatalog::Initialize(bool load_builtin) {
@@ -138,9 +138,10 @@ void PostgresCatalog::ScanSchemas(ClientContext &context, std::function<void(Sch
138138
schemas.Scan(context, [&](CatalogEntry &schema) { callback(schema.Cast<PostgresSchemaEntry>()); });
139139
}
140140

141-
optional_ptr<SchemaCatalogEntry> PostgresCatalog::LookupSchema(CatalogTransaction transaction, const EntryLookupInfo &schema_lookup,
142-
OnEntryNotFound if_not_found) {
143-
auto schema_name = schema_lookup.GetEntryName();
141+
optional_ptr<SchemaCatalogEntry> PostgresCatalog::LookupSchema(CatalogTransaction transaction,
142+
const EntryLookupInfo &schema_lookup,
143+
OnEntryNotFound if_not_found) {
144+
auto schema_name = schema_lookup.GetEntryName();
144145
auto &postgres_transaction = PostgresTransaction::Get(transaction.GetContext(), *this);
145146
if (schema_name == "pg_temp") {
146147
schema_name = postgres_transaction.GetTemporarySchema();

0 commit comments

Comments
 (0)