Skip to content

Implement #204 - Add support for POSTGRES_BINARY copy method #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ add_subdirectory(storage)
add_library(
postgres_ext_library OBJECT
postgres_attach.cpp
postgres_binary_copy.cpp
postgres_connection.cpp
postgres_copy_from.cpp
postgres_copy_to.cpp
36 changes: 36 additions & 0 deletions src/include/postgres_binary_copy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// postgres_binary_copy.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "postgres_utils.hpp"
#include "postgres_result.hpp"
#include "duckdb/function/copy_function.hpp"

namespace duckdb {

class PostgresBinaryCopyFunction : public CopyFunction {
public:
PostgresBinaryCopyFunction();

static unique_ptr<FunctionData> PostgresBinaryWriteBind(ClientContext &context, CopyFunctionBindInput &input,
const vector<string> &names, const vector<LogicalType> &sql_types);

static unique_ptr<GlobalFunctionData> PostgresBinaryWriteInitializeGlobal(ClientContext &context, FunctionData &bind_data,
const string &file_path);
static unique_ptr<LocalFunctionData> PostgresBinaryWriteInitializeLocal(ExecutionContext &context, FunctionData &bind_data_p);
static void PostgresBinaryWriteSink(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate,
LocalFunctionData &lstate, DataChunk &input);
static void PostgresBinaryWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate,
LocalFunctionData &lstate);
static void PostgresBinaryWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate);
};



} // namespace duckdb
2 changes: 1 addition & 1 deletion src/include/postgres_binary_writer.hpp
Original file line number Diff line number Diff line change
@@ -399,7 +399,7 @@ class PostgresBinaryWriter {
break;
}
default:
throw InternalException("Unsupported type for Postgres insert");
throw NotImplementedException("Type \"%s\" is not supported for Postgres binary copy", type);
}
}

96 changes: 96 additions & 0 deletions src/postgres_binary_copy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#include "postgres_binary_copy.hpp"
#include "postgres_binary_writer.hpp"
#include "duckdb/common/serializer/buffered_file_writer.hpp"
#include "duckdb/common/file_system.hpp"

namespace duckdb {

PostgresBinaryCopyFunction::PostgresBinaryCopyFunction() :
CopyFunction("postgres_binary") {

copy_to_bind = PostgresBinaryWriteBind;
copy_to_initialize_global = PostgresBinaryWriteInitializeGlobal;
copy_to_initialize_local = PostgresBinaryWriteInitializeLocal;
copy_to_sink = PostgresBinaryWriteSink;
copy_to_combine = PostgresBinaryWriteCombine;
copy_to_finalize = PostgresBinaryWriteFinalize;
}

struct PostgresBinaryCopyGlobalState : public GlobalFunctionData {
unique_ptr<BufferedFileWriter> file_writer;

void Flush(PostgresBinaryWriter &writer) {
file_writer->WriteData(writer.stream.GetData(), writer.stream.GetPosition());
}

void WriteHeader() {
PostgresBinaryWriter writer;
writer.WriteHeader();
Flush(writer);
}

void WriteChunk(DataChunk &chunk) {
chunk.Flatten();
PostgresBinaryWriter writer;
for (idx_t r = 0; r < chunk.size(); r++) {
writer.BeginRow(chunk.ColumnCount());
for (idx_t c = 0; c < chunk.ColumnCount(); c++) {
auto &col = chunk.data[c];
writer.WriteValue(col, r);
}
writer.FinishRow();
}
Flush(writer);
}

void Flush() {
// write the footer
PostgresBinaryWriter writer;
writer.WriteFooter();
Flush(writer);
// flush and close the file
file_writer->Flush();
file_writer.reset();
}
};

struct PostgresBinaryWriteBindData : public TableFunctionData {
};

unique_ptr<FunctionData> PostgresBinaryCopyFunction::PostgresBinaryWriteBind(ClientContext &context, CopyFunctionBindInput &input,
const vector<string> &names, const vector<LogicalType> &sql_types) {
return make_uniq<PostgresBinaryWriteBindData>();
}

unique_ptr<GlobalFunctionData> PostgresBinaryCopyFunction::PostgresBinaryWriteInitializeGlobal(ClientContext &context, FunctionData &bind_data,
const string &file_path) {
auto result = make_uniq<PostgresBinaryCopyGlobalState>();
auto &fs = FileSystem::GetFileSystem(context);
result->file_writer = make_uniq<BufferedFileWriter>(fs, file_path);
// write the header
result->WriteHeader();
return std::move(result);
}

unique_ptr<LocalFunctionData> PostgresBinaryCopyFunction::PostgresBinaryWriteInitializeLocal(ExecutionContext &context, FunctionData &bind_data_p) {
return make_uniq<LocalFunctionData>();
}

void PostgresBinaryCopyFunction::PostgresBinaryWriteSink(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate_p,
LocalFunctionData &lstate, DataChunk &input) {
auto &gstate = gstate_p.Cast<PostgresBinaryCopyGlobalState>();
gstate.WriteChunk(input);
}

void PostgresBinaryCopyFunction::PostgresBinaryWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate,
LocalFunctionData &lstate) {
}

void PostgresBinaryCopyFunction::PostgresBinaryWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate_p) {
auto &gstate = gstate_p.Cast<PostgresBinaryCopyGlobalState>();
// write the footer and close the file
gstate.Flush();
}


}
4 changes: 4 additions & 0 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
#include "postgres_scanner.hpp"
#include "postgres_storage.hpp"
#include "postgres_scanner_extension.hpp"
#include "postgres_binary_copy.hpp"

#include "duckdb/catalog/catalog.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
@@ -90,6 +91,9 @@ static void LoadInternal(DatabaseInstance &db) {
PostgresExecuteFunction execute_func;
ExtensionUtil::RegisterFunction(db, execute_func);

PostgresBinaryCopyFunction binary_copy;
ExtensionUtil::RegisterFunction(db, binary_copy);

auto &config = DBConfig::GetConfig(db);
config.storage_extensions["postgres_scanner"] = make_uniq<PostgresStorageExtension>();

72 changes: 72 additions & 0 deletions test/sql/misc/postgres_binary.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# name: test/sql/misc/postgres_binary.test
# description: Test postgres binary copy through a file
# group: [scanner]

require postgres_scanner

require-env POSTGRES_TEST_DATABASE_AVAILABLE

statement ok
CALL postgres_attach('dbname=postgresscanner');

statement ok
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES)

# straightforward integer copy
statement ok
COPY (SELECT i::INT AS i FROM range(100) t(i)) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);

statement ok
CREATE OR REPLACE TABLE s.binary_copy_test(i INTEGER);

statement ok
CALL postgres_execute('s', 'COPY binary_copy_test FROM ''__WORKING_DIRECTORY__/__TEST_DIR__/pg_binary.bin'' (FORMAT binary)')

query IIII
SELECT COUNT(*), MIN(i), MAX(i), SUM(i) FROM s.binary_copy_test
----
100 0 99 4950

# test all supported types
statement ok
CREATE TABLE all_types_tbl AS
SELECT bool, smallint, int, bigint, float, double, dec_4_1, dec_9_4, dec_18_6, dec38_10,
case when date < '1992-01-01' then '4712-01-01 (BC)' else '5874896-01-01' end as date, -- postgres has more constrained date ranges
time,
timestamp, interval, uuid, blob,
replace(varchar, chr(0), '') as varchar, -- postgres does not support null bytes in varchar columns
blob, int_array, varchar_array
FROM test_all_types()

# create an empty table on the postgres side
statement ok
CREATE OR REPLACE TABLE s.binary_copy_test AS FROM all_types_tbl LIMIT 0;

statement ok
COPY all_types_tbl TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);

statement ok
CALL postgres_execute('s', 'COPY binary_copy_test FROM ''__WORKING_DIRECTORY__/__TEST_DIR__/pg_binary.bin'' (FORMAT binary)')

query I nosort all_types
FROM all_types_tbl
----

query I nosort all_types
SELECT * FROM s.binary_copy_test
----

# test an unsupported type
statement error
COPY (SELECT 42::UINT32) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);
----
not supported

# reading not yet supported
statement ok
CREATE TABLE read_tbl(i int);

statement error
COPY read_tbl FROM '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);
----
not supported
Loading