From 448ec1af15209022d9236fdacc4ca268c891f4fc Mon Sep 17 00:00:00 2001 From: Mark Raasveldt Date: Wed, 3 Apr 2024 19:52:09 +0200 Subject: [PATCH 1/2] Add support for POSTGRES_BINARY copy method --- src/CMakeLists.txt | 1 + src/include/postgres_binary_copy.hpp | 36 ++++++++++ src/include/postgres_binary_writer.hpp | 2 +- src/postgres_binary_copy.cpp | 96 ++++++++++++++++++++++++++ src/postgres_extension.cpp | 4 ++ test/sql/misc/postgres_binary.test | 63 +++++++++++++++++ 6 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 src/include/postgres_binary_copy.hpp create mode 100644 src/postgres_binary_copy.cpp create mode 100644 test/sql/misc/postgres_binary.test diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 41c49057..4891e09b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(storage) add_library( postgres_ext_library OBJECT postgres_attach.cpp + postgres_binary_copy.cpp postgres_connection.cpp postgres_copy_from.cpp postgres_copy_to.cpp diff --git a/src/include/postgres_binary_copy.hpp b/src/include/postgres_binary_copy.hpp new file mode 100644 index 00000000..4d49b008 --- /dev/null +++ b/src/include/postgres_binary_copy.hpp @@ -0,0 +1,36 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// postgres_binary_copy.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "postgres_utils.hpp" +#include "postgres_result.hpp" +#include "duckdb/function/copy_function.hpp" + +namespace duckdb { + +class PostgresBinaryCopyFunction : public CopyFunction { +public: + PostgresBinaryCopyFunction(); + + static unique_ptr PostgresBinaryWriteBind(ClientContext &context, CopyFunctionBindInput &input, + const vector &names, const vector &sql_types); + + static unique_ptr PostgresBinaryWriteInitializeGlobal(ClientContext &context, FunctionData &bind_data, + const string &file_path); + static unique_ptr PostgresBinaryWriteInitializeLocal(ExecutionContext &context, FunctionData &bind_data_p); + static void PostgresBinaryWriteSink(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate, + LocalFunctionData &lstate, DataChunk &input); + static void PostgresBinaryWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate, + LocalFunctionData &lstate); + static void PostgresBinaryWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate); +}; + + + +} // namespace duckdb diff --git a/src/include/postgres_binary_writer.hpp b/src/include/postgres_binary_writer.hpp index 990bba9c..5e3a1f86 100644 --- a/src/include/postgres_binary_writer.hpp +++ b/src/include/postgres_binary_writer.hpp @@ -399,7 +399,7 @@ class PostgresBinaryWriter { break; } default: - throw InternalException("Unsupported type for Postgres insert"); + throw NotImplementedException("Type \"%s\" is not supported for Postgres binary copy", type); } } diff --git a/src/postgres_binary_copy.cpp b/src/postgres_binary_copy.cpp new file mode 100644 index 00000000..f0d86a30 --- /dev/null +++ b/src/postgres_binary_copy.cpp @@ -0,0 +1,96 @@ +#include "postgres_binary_copy.hpp" +#include "postgres_binary_writer.hpp" +#include "duckdb/common/serializer/buffered_file_writer.hpp" +#include "duckdb/common/file_system.hpp" + +namespace duckdb { + +PostgresBinaryCopyFunction::PostgresBinaryCopyFunction() : + CopyFunction("postgres_binary") { + + copy_to_bind = PostgresBinaryWriteBind; + copy_to_initialize_global = PostgresBinaryWriteInitializeGlobal; + copy_to_initialize_local = PostgresBinaryWriteInitializeLocal; + copy_to_sink = PostgresBinaryWriteSink; + copy_to_combine = PostgresBinaryWriteCombine; + copy_to_finalize = PostgresBinaryWriteFinalize; +} + +struct PostgresBinaryCopyGlobalState : public GlobalFunctionData { + unique_ptr file_writer; + + void Flush(PostgresBinaryWriter &writer) { + file_writer->WriteData(writer.stream.GetData(), writer.stream.GetPosition()); + } + + void WriteHeader() { + PostgresBinaryWriter writer; + writer.WriteHeader(); + Flush(writer); + } + + void WriteChunk(DataChunk &chunk) { + chunk.Flatten(); + PostgresBinaryWriter writer; + for (idx_t r = 0; r < chunk.size(); r++) { + writer.BeginRow(chunk.ColumnCount()); + for (idx_t c = 0; c < chunk.ColumnCount(); c++) { + auto &col = chunk.data[c]; + writer.WriteValue(col, r); + } + writer.FinishRow(); + } + Flush(writer); + } + + void Flush() { + // write the footer + PostgresBinaryWriter writer; + writer.WriteFooter(); + Flush(writer); + // flush and close the file + file_writer->Flush(); + file_writer.reset(); + } +}; + +struct PostgresBinaryWriteBindData : public TableFunctionData { +}; + +unique_ptr PostgresBinaryCopyFunction::PostgresBinaryWriteBind(ClientContext &context, CopyFunctionBindInput &input, + const vector &names, const vector &sql_types) { + return make_uniq(); +} + +unique_ptr PostgresBinaryCopyFunction::PostgresBinaryWriteInitializeGlobal(ClientContext &context, FunctionData &bind_data, + const string &file_path) { + auto result = make_uniq(); + auto &fs = FileSystem::GetFileSystem(context); + result->file_writer = make_uniq(fs, file_path); + // write the header + result->WriteHeader(); + return std::move(result); +} + +unique_ptr PostgresBinaryCopyFunction::PostgresBinaryWriteInitializeLocal(ExecutionContext &context, FunctionData &bind_data_p) { + return make_uniq(); +} + +void PostgresBinaryCopyFunction::PostgresBinaryWriteSink(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate_p, + LocalFunctionData &lstate, DataChunk &input) { + auto &gstate = gstate_p.Cast(); + gstate.WriteChunk(input); +} + +void PostgresBinaryCopyFunction::PostgresBinaryWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate, + LocalFunctionData &lstate) { +} + +void PostgresBinaryCopyFunction::PostgresBinaryWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate_p) { + auto &gstate = gstate_p.Cast(); + // write the footer and close the file + gstate.Flush(); +} + + +} \ No newline at end of file diff --git a/src/postgres_extension.cpp b/src/postgres_extension.cpp index aebeeed3..34d46d04 100644 --- a/src/postgres_extension.cpp +++ b/src/postgres_extension.cpp @@ -4,6 +4,7 @@ #include "postgres_scanner.hpp" #include "postgres_storage.hpp" #include "postgres_scanner_extension.hpp" +#include "postgres_binary_copy.hpp" #include "duckdb/catalog/catalog.hpp" #include "duckdb/parser/parsed_data/create_table_function_info.hpp" @@ -90,6 +91,9 @@ static void LoadInternal(DatabaseInstance &db) { PostgresExecuteFunction execute_func; ExtensionUtil::RegisterFunction(db, execute_func); + PostgresBinaryCopyFunction binary_copy; + ExtensionUtil::RegisterFunction(db, binary_copy); + auto &config = DBConfig::GetConfig(db); config.storage_extensions["postgres_scanner"] = make_uniq(); diff --git a/test/sql/misc/postgres_binary.test b/test/sql/misc/postgres_binary.test new file mode 100644 index 00000000..43957fd5 --- /dev/null +++ b/test/sql/misc/postgres_binary.test @@ -0,0 +1,63 @@ +# name: test/sql/misc/postgres_binary.test +# description: Test postgres binary copy through a file +# group: [scanner] + +require postgres_scanner + +require-env POSTGRES_TEST_DATABASE_AVAILABLE + +statement ok +CALL postgres_attach('dbname=postgresscanner'); + +statement ok +ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES) + +# straightforward integer copy +statement ok +COPY (SELECT i::INT AS i FROM range(100) t(i)) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary); + +statement ok +CREATE OR REPLACE TABLE s.binary_copy_test(i INTEGER); + +statement ok +CALL postgres_execute('s', 'COPY binary_copy_test FROM ''__WORKING_DIRECTORY__/__TEST_DIR__/pg_binary.bin'' (FORMAT binary)') + +query IIII +SELECT COUNT(*), MIN(i), MAX(i), SUM(i) FROM s.binary_copy_test +---- +100 0 99 4950 + +# test all supported types +statement ok +CREATE TABLE all_types_tbl AS +SELECT bool, smallint, int, bigint, float, double, dec_4_1, dec_9_4, dec_18_6, dec38_10, + case when date < '1992-01-01' then '4712-01-01 (BC)' else '5874896-01-01' end as date, -- postgres has more constrained date ranges + time, + timestamp, interval, uuid, blob, + replace(varchar, chr(0), '') as varchar, -- postgres does not support null bytes in varchar columns + blob, int_array, varchar_array +FROM test_all_types() + +# create an empty table on the postgres side +statement ok +CREATE OR REPLACE TABLE s.binary_copy_test AS FROM all_types_tbl LIMIT 0; + +statement ok +COPY all_types_tbl TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary); + +statement ok +CALL postgres_execute('s', 'COPY binary_copy_test FROM ''__WORKING_DIRECTORY__/__TEST_DIR__/pg_binary.bin'' (FORMAT binary)') + +query I nosort all_types +FROM all_types_tbl +---- + +query I nosort all_types +SELECT * FROM s.binary_copy_test +---- + +# test an unsupported type +statement error +COPY (SELECT 42::UINT32) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary); +---- +not supported From bde66bbe76c8912c9d32db2b809b303b7474029d Mon Sep 17 00:00:00 2001 From: Mark Raasveldt Date: Wed, 3 Apr 2024 19:56:59 +0200 Subject: [PATCH 2/2] Add test for reading not being supported --- test/sql/misc/postgres_binary.test | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/sql/misc/postgres_binary.test b/test/sql/misc/postgres_binary.test index 43957fd5..ddebfb39 100644 --- a/test/sql/misc/postgres_binary.test +++ b/test/sql/misc/postgres_binary.test @@ -61,3 +61,12 @@ statement error COPY (SELECT 42::UINT32) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary); ---- not supported + +# reading not yet supported +statement ok +CREATE TABLE read_tbl(i int); + +statement error +COPY read_tbl FROM '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary); +---- +not supported