Skip to content

Commit 3857b7c

Browse files
committed
Merge branch 'main' into issue319
2 parents f652682 + 6647911 commit 3857b7c

22 files changed

+191
-105
lines changed

.github/workflows/MainDistributionPipeline.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ jobs:
1616
name: Build extension binaries
1717
uses: duckdb/extension-ci-tools/.github/workflows/[email protected]
1818
with:
19-
duckdb_version: main
20-
ci_tools_version: main
19+
duckdb_version: v1.3.0
20+
ci_tools_version: v1.3.0
2121
extension_name: postgres_scanner
2222
exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw'
2323

@@ -27,8 +27,8 @@ jobs:
2727
uses: duckdb/extension-ci-tools/.github/workflows/[email protected]
2828
secrets: inherit
2929
with:
30-
duckdb_version: main
31-
ci_tools_version: main
30+
duckdb_version: v1.3.0
31+
ci_tools_version: v1.3.0
3232
extension_name: postgres_scanner
3333
exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw'
3434
deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }}

src/include/postgres_utils.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ struct PostgresCopyState {
5454
void Initialize(ClientContext &context);
5555
};
5656

57+
enum class PostgresIsolationLevel { READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE };
58+
5759
class PostgresUtils {
5860
public:
5961
static PGconn *PGConnect(const string &dsn);

src/include/storage/postgres_catalog.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ class PostgresSchemaEntry;
2121
class PostgresCatalog : public Catalog {
2222
public:
2323
explicit PostgresCatalog(AttachedDatabase &db_p, string connection_string, string attach_path,
24-
AccessMode access_mode, string schema_to_load);
24+
AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level);
2525
~PostgresCatalog();
2626

2727
string connection_string;
2828
string attach_path;
2929
AccessMode access_mode;
30+
PostgresIsolationLevel isolation_level;
3031

3132
public:
3233
void Initialize(bool load_builtin) override;

src/include/storage/postgres_transaction.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@ class PostgresTransaction : public Transaction {
4343
PostgresPoolConnection connection;
4444
PostgresTransactionState transaction_state;
4545
AccessMode access_mode;
46+
PostgresIsolationLevel isolation_level;
4647
string temporary_schema;
4748

4849
private:
4950
//! Retrieves the connection **without** starting a transaction if none is active
5051
PostgresConnection &GetConnectionRaw();
52+
53+
string GetBeginTransactionQuery();
5154
};
5255

5356
} // namespace duckdb

src/postgres_binary_copy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,4 @@ void PostgresBinaryCopyFunction::PostgresBinaryWriteFinalize(ClientContext &cont
101101
gstate.Flush();
102102
}
103103

104-
} // namespace duckdb
104+
} // namespace duckdb

src/postgres_execute.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ static void PGExecuteFunction(ClientContext &context, TableFunctionInput &data_p
6666
PostgresExecuteFunction::PostgresExecuteFunction()
6767
: TableFunction("postgres_execute", {LogicalType::VARCHAR, LogicalType::VARCHAR}, PGExecuteFunction,
6868
PGExecuteBind) {
69-
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
69+
named_parameters["use_transaction"] = LogicalType::BOOLEAN;
7070
}
7171

7272
} // namespace duckdb

src/postgres_extension.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,8 @@ static void LoadInternal(DatabaseInstance &db) {
173173
LogicalType::BOOLEAN, Value::BOOLEAN(false), PostgresClearCacheFunction::ClearCacheOnSetting);
174174
config.AddExtensionOption("pg_connection_cache", "Whether or not to use the connection cache", LogicalType::BOOLEAN,
175175
Value::BOOLEAN(true), PostgresConnectionPool::PostgresSetConnectionCache);
176-
config.AddExtensionOption("pg_experimental_filter_pushdown",
177-
"Whether or not to use filter pushdown", LogicalType::BOOLEAN,
178-
Value::BOOLEAN(true));
176+
config.AddExtensionOption("pg_experimental_filter_pushdown", "Whether or not to use filter pushdown",
177+
LogicalType::BOOLEAN, Value::BOOLEAN(true));
179178
config.AddExtensionOption("pg_null_byte_replacement",
180179
"When writing NULL bytes to Postgres, replace them with the given character",
181180
LogicalType::VARCHAR, Value(), SetPostgresNullByteReplacement);

src/postgres_filter_pushdown.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ string TransformBlob(const string &val) {
4545
char const HEX_DIGITS[] = "0123456789ABCDEF";
4646

4747
string result = "'\\x";
48-
for(idx_t i = 0; i < val.size(); i++) {
48+
for (idx_t i = 0; i < val.size(); i++) {
4949
uint8_t byte_val = static_cast<uint8_t>(val[i]);
5050
result += HEX_DIGITS[(byte_val >> 4) & 0xf];
5151
result += HEX_DIGITS[byte_val & 0xf];
@@ -96,7 +96,7 @@ string PostgresFilterPushdown::TransformFilter(string &column_name, TableFilter
9696
case TableFilterType::IN_FILTER: {
9797
auto &in_filter = filter.Cast<InFilter>();
9898
string in_list;
99-
for(auto &val : in_filter.values) {
99+
for (auto &val : in_filter.values) {
100100
if (!in_list.empty()) {
101101
in_list += ", ";
102102
}

src/postgres_scanner.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -251,15 +251,15 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
251251
}
252252
if (bind_data->table_name.empty()) {
253253
D_ASSERT(!bind_data->sql.empty());
254-
lstate.sql = StringUtil::Format(
255-
R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)",
256-
col_names, bind_data->sql, filter, bind_data->limit);
254+
lstate.sql =
255+
StringUtil::Format(R"(COPY (SELECT %s FROM (%s) AS __unnamed_subquery %s%s) TO STDOUT (FORMAT "binary");)",
256+
col_names, bind_data->sql, filter, bind_data->limit);
257257

258258
} else {
259-
lstate.sql = StringUtil::Format(
260-
R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)",
261-
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
262-
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
259+
lstate.sql =
260+
StringUtil::Format(R"(COPY (SELECT %s FROM %s.%s %s%s) TO STDOUT (FORMAT "binary");)", col_names,
261+
KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
262+
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
263263
}
264264
lstate.exec = false;
265265
lstate.done = false;

src/postgres_storage.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl
1818

1919
string secret_name;
2020
string schema_to_load;
21+
PostgresIsolationLevel isolation_level = PostgresIsolationLevel::REPEATABLE_READ;
2122
for (auto &entry : info.options) {
2223
auto lower_name = StringUtil::Lower(entry.first);
2324
if (lower_name == "type" || lower_name == "read_only") {
@@ -26,12 +27,27 @@ static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, Cl
2627
secret_name = entry.second.ToString();
2728
} else if (lower_name == "schema") {
2829
schema_to_load = entry.second.ToString();
30+
} else if (lower_name == "isolation_level") {
31+
auto param = entry.second.ToString();
32+
auto lparam = StringUtil::Lower(param);
33+
if (lparam == "read committed") {
34+
isolation_level = PostgresIsolationLevel::READ_COMMITTED;
35+
} else if (lparam == "repeatable read") {
36+
isolation_level = PostgresIsolationLevel::REPEATABLE_READ;
37+
} else if (lparam == "serializable") {
38+
isolation_level = PostgresIsolationLevel::SERIALIZABLE;
39+
} else {
40+
throw InvalidInputException("Invalid value \"%s\" for isolation_level, expected READ COMMITTED, "
41+
"REPEATABLE READ or SERIALIZABLE",
42+
param);
43+
}
2944
} else {
3045
throw BinderException("Unrecognized option for Postgres attach: %s", entry.first);
3146
}
3247
}
3348
auto connection_string = PostgresCatalog::GetConnectionString(context, attach_path, secret_name);
34-
return make_uniq<PostgresCatalog>(db, std::move(connection_string), std::move(attach_path), access_mode, std::move(schema_to_load));
49+
return make_uniq<PostgresCatalog>(db, std::move(connection_string), std::move(attach_path), access_mode,
50+
std::move(schema_to_load), isolation_level);
3551
}
3652

3753
static unique_ptr<TransactionManager> PostgresCreateTransactionManager(StorageExtensionInfo *storage_info,

0 commit comments

Comments
 (0)