Skip to content

Commit 30a6a87

Browse files
committed
Implement text reader protocol
1 parent 959a869 commit 30a6a87

File tree

3 files changed

+138
-3
lines changed

3 files changed

+138
-3
lines changed

src/include/postgres_text_reader.hpp

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//===----------------------------------------------------------------------===//
2+
// DuckDB
3+
//
4+
// postgres_text_reader.hpp
5+
//
6+
//
7+
//===----------------------------------------------------------------------===//
8+
9+
#pragma once
10+
11+
#include "duckdb.hpp"
12+
#include "duckdb/common/types.hpp"
13+
#include "duckdb/common/types/vector.hpp"
14+
#include "libpq-fe.h"
15+
#include "postgres_conversion.hpp"
16+
#include "postgres_connection.hpp"
17+
#include <cstring>
18+
19+
namespace duckdb {
20+
21+
struct PostgresTextReader {
22+
explicit PostgresTextReader(PostgresConnection &con_p) : con(con_p), result(nullptr), col_vec(LogicalType::VARCHAR) {
23+
}
24+
25+
~PostgresTextReader() {
26+
Reset();
27+
}
28+
29+
PostgresConnection &GetConn() {
30+
return con;
31+
}
32+
33+
void ReadTextFrom(const string &query) {
34+
Reset();
35+
result = PQexec(con.GetConn(), query.c_str());
36+
if (!result || PQresultStatus(result) != PGRES_TUPLES_OK ) {
37+
throw IOException("Failed to execute query: %s", string(PQerrorMessage(con.GetConn())));
38+
}
39+
}
40+
41+
void Reset() {
42+
if (result) {
43+
PQclear(result);
44+
result = nullptr;
45+
}
46+
}
47+
48+
void ReadColumn(idx_t col_idx) {
49+
col_vec.Resize(0, RowCount());
50+
for (idx_t row_idx = 0; row_idx < RowCount(); row_idx++) {
51+
if (PQgetisnull(result, row_idx, col_idx)) {
52+
FlatVector::SetNull(col_vec, row_idx, true);
53+
continue;
54+
}
55+
char *value = PQgetvalue(result, row_idx, col_idx);
56+
int value_len = PQgetlength(result, row_idx, col_idx);
57+
FlatVector::GetData<string_t>(col_vec)[row_idx] = StringVector::AddStringOrBlob(col_vec, value, value_len);
58+
}
59+
}
60+
61+
void LoadResultTo (idx_t &col_idx, Vector &out_vec) {
62+
ReadColumn(col_idx);
63+
VectorOperations::DefaultCast(col_vec, out_vec, RowCount());
64+
}
65+
66+
// Get result metadata
67+
idx_t ColumnCount() {
68+
return PQnfields(result);
69+
}
70+
71+
idx_t RowCount() {
72+
return PQntuples(result);
73+
}
74+
75+
private:
76+
PostgresConnection &con;
77+
PGresult *result;
78+
Vector col_vec;
79+
};
80+
81+
} // namespace duckdb

src/postgres_extension.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ static void LoadInternal(DatabaseInstance &db) {
181181
LogicalType::VARCHAR, Value(), SetPostgresNullByteReplacement);
182182
config.AddExtensionOption("pg_debug_show_queries", "DEBUG SETTING: print all queries sent to Postgres to stdout",
183183
LogicalType::BOOLEAN, Value::BOOLEAN(false), SetPostgresDebugQueryPrint);
184+
config.AddExtensionOption("pg_use_legacy_text_protocol", "Whether or not to use legacy TEXT protocol to read data. Set this to yes will ignore pg_use_binary_copy",
185+
LogicalType::BOOLEAN, Value::BOOLEAN(true));
184186

185187
OptimizerExtension postgres_optimizer;
186188
postgres_optimizer.optimize_function = PostgresOptimizer::Optimize;

src/postgres_scanner.cpp

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <libpq-fe.h>
44

5+
#include "duckdb/common/unique_ptr.hpp"
56
#include "duckdb/main/extension_util.hpp"
67
#include "duckdb/common/shared_ptr.hpp"
78
#include "duckdb/common/helper.hpp"
@@ -10,6 +11,7 @@
1011
#include "postgres_scanner.hpp"
1112
#include "postgres_result.hpp"
1213
#include "postgres_binary_reader.hpp"
14+
#include "postgres_text_reader.hpp"
1315
#include "storage/postgres_catalog.hpp"
1416
#include "storage/postgres_transaction.hpp"
1517
#include "storage/postgres_table_set.hpp"
@@ -34,6 +36,10 @@ struct PostgresLocalState : public LocalTableFunctionState {
3436

3537
void ScanChunk(ClientContext &context, const PostgresBindData &bind_data, PostgresGlobalState &gstate,
3638
DataChunk &output);
39+
void ScanChunkWithBinaryReader(ClientContext &context, const PostgresBindData &bind_data, PostgresGlobalState &gstate,
40+
DataChunk &output);
41+
void ScanChunkWithTextReader(ClientContext &context, const PostgresBindData &bind_data, PostgresGlobalState &gstate,
42+
DataChunk &output);
3743
};
3844

3945
struct PostgresGlobalState : public GlobalTableFunctionState {
@@ -248,6 +254,18 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
248254
filter += " AND ";
249255
}
250256
filter += filter_string;
257+
};
258+
lstate.exec = false;
259+
lstate.done = false;
260+
Value use_legacy_text_protocol;
261+
if (context.TryGetCurrentSetting("pg_use_legacy_text_protocol", use_legacy_text_protocol)) {
262+
if (BooleanValue::Get(use_legacy_text_protocol)) {
263+
lstate.sql = StringUtil::Format(
264+
R"(SELECT %s FROM %s.%s %s%s;)",
265+
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
266+
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
267+
return;
268+
}
251269
}
252270
if (bind_data->table_name.empty()) {
253271
D_ASSERT(!bind_data->sql.empty());
@@ -261,8 +279,7 @@ static void PostgresInitInternal(ClientContext &context, const PostgresBindData
261279
col_names, KeywordHelper::WriteQuoted(bind_data->schema_name, '"'),
262280
KeywordHelper::WriteQuoted(bind_data->table_name, '"'), filter, bind_data->limit);
263281
}
264-
lstate.exec = false;
265-
lstate.done = false;
282+
266283
}
267284

268285
static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind_data_p) {
@@ -417,7 +434,7 @@ static unique_ptr<LocalTableFunctionState> PostgresInitLocalState(ExecutionConte
417434
return GetLocalState(context.client, input, gstate);
418435
}
419436

420-
void PostgresLocalState::ScanChunk(ClientContext &context, const PostgresBindData &bind_data,
437+
void PostgresLocalState::ScanChunkWithBinaryReader(ClientContext &context, const PostgresBindData &bind_data,
421438
PostgresGlobalState &gstate, DataChunk &output) {
422439
idx_t output_offset = 0;
423440
PostgresBinaryReader reader(connection);
@@ -472,6 +489,41 @@ void PostgresLocalState::ScanChunk(ClientContext &context, const PostgresBindDat
472489
}
473490
}
474491

492+
void PostgresLocalState::ScanChunkWithTextReader(ClientContext &context, const PostgresBindData &bind_data,
493+
PostgresGlobalState &gstate, DataChunk &output) {
494+
PostgresTextReader reader(connection);
495+
if (done && !PostgresParallelStateNext(context, &bind_data, *this, gstate)) {
496+
return;
497+
}
498+
499+
if (!exec) {
500+
reader.ReadTextFrom(sql);
501+
exec = true;
502+
}
503+
504+
output.SetCardinality(reader.RowCount());
505+
for (idx_t output_idx = 0; output_idx < output.ColumnCount(); output_idx++) {
506+
auto col_idx = column_ids[output_idx];
507+
auto &out_vec = output.data[output_idx];
508+
reader.LoadResultTo(col_idx, out_vec);
509+
}
510+
reader.Reset();
511+
done = true;
512+
return;
513+
}
514+
515+
void PostgresLocalState::ScanChunk(ClientContext &context, const PostgresBindData &bind_data,
516+
PostgresGlobalState &gstate, DataChunk &output) {
517+
Value use_legacy_text_protocol;
518+
if (context.TryGetCurrentSetting("pg_use_legacy_text_protocol", use_legacy_text_protocol)) {
519+
if (BooleanValue::Get(use_legacy_text_protocol)) {
520+
ScanChunkWithTextReader(context, bind_data, gstate, output);
521+
return;
522+
}
523+
}
524+
ScanChunkWithBinaryReader(context, bind_data, gstate, output);
525+
}
526+
475527
static void PostgresScan(ClientContext &context, TableFunctionInput &data, DataChunk &output) {
476528
auto &bind_data = data.bind_data->Cast<PostgresBindData>();
477529
auto &gstate = data.global_state->Cast<PostgresGlobalState>();

0 commit comments

Comments
 (0)