Skip to content

Commit d857039

Browse files
authored
Merge pull request #205 from Mytherin/binarycopy
Implement #204 - Add support for POSTGRES_BINARY copy method
2 parents c054096 + bde66bb commit d857039

6 files changed

+210
-1
lines changed

src/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ add_subdirectory(storage)
55
add_library(
66
postgres_ext_library OBJECT
77
postgres_attach.cpp
8+
postgres_binary_copy.cpp
89
postgres_connection.cpp
910
postgres_copy_from.cpp
1011
postgres_copy_to.cpp

src/include/postgres_binary_copy.hpp

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//===----------------------------------------------------------------------===//
2+
// DuckDB
3+
//
4+
// postgres_binary_copy.hpp
5+
//
6+
//
7+
//===----------------------------------------------------------------------===//
8+
9+
#pragma once
10+
11+
#include "postgres_utils.hpp"
12+
#include "postgres_result.hpp"
13+
#include "duckdb/function/copy_function.hpp"
14+
15+
namespace duckdb {
16+
17+
class PostgresBinaryCopyFunction : public CopyFunction {
18+
public:
19+
PostgresBinaryCopyFunction();
20+
21+
static unique_ptr<FunctionData> PostgresBinaryWriteBind(ClientContext &context, CopyFunctionBindInput &input,
22+
const vector<string> &names, const vector<LogicalType> &sql_types);
23+
24+
static unique_ptr<GlobalFunctionData> PostgresBinaryWriteInitializeGlobal(ClientContext &context, FunctionData &bind_data,
25+
const string &file_path);
26+
static unique_ptr<LocalFunctionData> PostgresBinaryWriteInitializeLocal(ExecutionContext &context, FunctionData &bind_data_p);
27+
static void PostgresBinaryWriteSink(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate,
28+
LocalFunctionData &lstate, DataChunk &input);
29+
static void PostgresBinaryWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate,
30+
LocalFunctionData &lstate);
31+
static void PostgresBinaryWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate);
32+
};
33+
34+
35+
36+
} // namespace duckdb

src/include/postgres_binary_writer.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ class PostgresBinaryWriter {
399399
break;
400400
}
401401
default:
402-
throw InternalException("Unsupported type for Postgres insert");
402+
throw NotImplementedException("Type \"%s\" is not supported for Postgres binary copy", type);
403403
}
404404
}
405405

src/postgres_binary_copy.cpp

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#include "postgres_binary_copy.hpp"
2+
#include "postgres_binary_writer.hpp"
3+
#include "duckdb/common/serializer/buffered_file_writer.hpp"
4+
#include "duckdb/common/file_system.hpp"
5+
6+
namespace duckdb {
7+
8+
PostgresBinaryCopyFunction::PostgresBinaryCopyFunction() :
9+
CopyFunction("postgres_binary") {
10+
11+
copy_to_bind = PostgresBinaryWriteBind;
12+
copy_to_initialize_global = PostgresBinaryWriteInitializeGlobal;
13+
copy_to_initialize_local = PostgresBinaryWriteInitializeLocal;
14+
copy_to_sink = PostgresBinaryWriteSink;
15+
copy_to_combine = PostgresBinaryWriteCombine;
16+
copy_to_finalize = PostgresBinaryWriteFinalize;
17+
}
18+
19+
struct PostgresBinaryCopyGlobalState : public GlobalFunctionData {
20+
unique_ptr<BufferedFileWriter> file_writer;
21+
22+
void Flush(PostgresBinaryWriter &writer) {
23+
file_writer->WriteData(writer.stream.GetData(), writer.stream.GetPosition());
24+
}
25+
26+
void WriteHeader() {
27+
PostgresBinaryWriter writer;
28+
writer.WriteHeader();
29+
Flush(writer);
30+
}
31+
32+
void WriteChunk(DataChunk &chunk) {
33+
chunk.Flatten();
34+
PostgresBinaryWriter writer;
35+
for (idx_t r = 0; r < chunk.size(); r++) {
36+
writer.BeginRow(chunk.ColumnCount());
37+
for (idx_t c = 0; c < chunk.ColumnCount(); c++) {
38+
auto &col = chunk.data[c];
39+
writer.WriteValue(col, r);
40+
}
41+
writer.FinishRow();
42+
}
43+
Flush(writer);
44+
}
45+
46+
void Flush() {
47+
// write the footer
48+
PostgresBinaryWriter writer;
49+
writer.WriteFooter();
50+
Flush(writer);
51+
// flush and close the file
52+
file_writer->Flush();
53+
file_writer.reset();
54+
}
55+
};
56+
57+
struct PostgresBinaryWriteBindData : public TableFunctionData {
58+
};
59+
60+
unique_ptr<FunctionData> PostgresBinaryCopyFunction::PostgresBinaryWriteBind(ClientContext &context, CopyFunctionBindInput &input,
61+
const vector<string> &names, const vector<LogicalType> &sql_types) {
62+
return make_uniq<PostgresBinaryWriteBindData>();
63+
}
64+
65+
unique_ptr<GlobalFunctionData> PostgresBinaryCopyFunction::PostgresBinaryWriteInitializeGlobal(ClientContext &context, FunctionData &bind_data,
66+
const string &file_path) {
67+
auto result = make_uniq<PostgresBinaryCopyGlobalState>();
68+
auto &fs = FileSystem::GetFileSystem(context);
69+
result->file_writer = make_uniq<BufferedFileWriter>(fs, file_path);
70+
// write the header
71+
result->WriteHeader();
72+
return std::move(result);
73+
}
74+
75+
unique_ptr<LocalFunctionData> PostgresBinaryCopyFunction::PostgresBinaryWriteInitializeLocal(ExecutionContext &context, FunctionData &bind_data_p) {
76+
return make_uniq<LocalFunctionData>();
77+
}
78+
79+
void PostgresBinaryCopyFunction::PostgresBinaryWriteSink(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate_p,
80+
LocalFunctionData &lstate, DataChunk &input) {
81+
auto &gstate = gstate_p.Cast<PostgresBinaryCopyGlobalState>();
82+
gstate.WriteChunk(input);
83+
}
84+
85+
void PostgresBinaryCopyFunction::PostgresBinaryWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate,
86+
LocalFunctionData &lstate) {
87+
}
88+
89+
void PostgresBinaryCopyFunction::PostgresBinaryWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate_p) {
90+
auto &gstate = gstate_p.Cast<PostgresBinaryCopyGlobalState>();
91+
// write the footer and close the file
92+
gstate.Flush();
93+
}
94+
95+
96+
}

src/postgres_extension.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "postgres_scanner.hpp"
55
#include "postgres_storage.hpp"
66
#include "postgres_scanner_extension.hpp"
7+
#include "postgres_binary_copy.hpp"
78

89
#include "duckdb/catalog/catalog.hpp"
910
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
@@ -90,6 +91,9 @@ static void LoadInternal(DatabaseInstance &db) {
9091
PostgresExecuteFunction execute_func;
9192
ExtensionUtil::RegisterFunction(db, execute_func);
9293

94+
PostgresBinaryCopyFunction binary_copy;
95+
ExtensionUtil::RegisterFunction(db, binary_copy);
96+
9397
auto &config = DBConfig::GetConfig(db);
9498
config.storage_extensions["postgres_scanner"] = make_uniq<PostgresStorageExtension>();
9599

test/sql/misc/postgres_binary.test

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# name: test/sql/misc/postgres_binary.test
2+
# description: Test postgres binary copy through a file
3+
# group: [scanner]
4+
5+
require postgres_scanner
6+
7+
require-env POSTGRES_TEST_DATABASE_AVAILABLE
8+
9+
statement ok
10+
CALL postgres_attach('dbname=postgresscanner');
11+
12+
statement ok
13+
ATTACH 'dbname=postgresscanner' AS s (TYPE POSTGRES)
14+
15+
# straightforward integer copy
16+
statement ok
17+
COPY (SELECT i::INT AS i FROM range(100) t(i)) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);
18+
19+
statement ok
20+
CREATE OR REPLACE TABLE s.binary_copy_test(i INTEGER);
21+
22+
statement ok
23+
CALL postgres_execute('s', 'COPY binary_copy_test FROM ''__WORKING_DIRECTORY__/__TEST_DIR__/pg_binary.bin'' (FORMAT binary)')
24+
25+
query IIII
26+
SELECT COUNT(*), MIN(i), MAX(i), SUM(i) FROM s.binary_copy_test
27+
----
28+
100 0 99 4950
29+
30+
# test all supported types
31+
statement ok
32+
CREATE TABLE all_types_tbl AS
33+
SELECT bool, smallint, int, bigint, float, double, dec_4_1, dec_9_4, dec_18_6, dec38_10,
34+
case when date < '1992-01-01' then '4712-01-01 (BC)' else '5874896-01-01' end as date, -- postgres has more constrained date ranges
35+
time,
36+
timestamp, interval, uuid, blob,
37+
replace(varchar, chr(0), '') as varchar, -- postgres does not support null bytes in varchar columns
38+
blob, int_array, varchar_array
39+
FROM test_all_types()
40+
41+
# create an empty table on the postgres side
42+
statement ok
43+
CREATE OR REPLACE TABLE s.binary_copy_test AS FROM all_types_tbl LIMIT 0;
44+
45+
statement ok
46+
COPY all_types_tbl TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);
47+
48+
statement ok
49+
CALL postgres_execute('s', 'COPY binary_copy_test FROM ''__WORKING_DIRECTORY__/__TEST_DIR__/pg_binary.bin'' (FORMAT binary)')
50+
51+
query I nosort all_types
52+
FROM all_types_tbl
53+
----
54+
55+
query I nosort all_types
56+
SELECT * FROM s.binary_copy_test
57+
----
58+
59+
# test an unsupported type
60+
statement error
61+
COPY (SELECT 42::UINT32) TO '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);
62+
----
63+
not supported
64+
65+
# reading not yet supported
66+
statement ok
67+
CREATE TABLE read_tbl(i int);
68+
69+
statement error
70+
COPY read_tbl FROM '__TEST_DIR__/pg_binary.bin' (FORMAT postgres_binary);
71+
----
72+
not supported

0 commit comments

Comments
 (0)