Skip to content

Commit c5abb2c

Browse files
committed
Implement binary insert with PreparedInsert
Replace the old (and commented-out) code with the new `PreparedInsert` pattern in `clickhouse-cpp`. This greatly simplifies the code, as the most of the necessary logic for creating the insert Block object and tracking the state of the insert is now handled by the `PreparedInsert` object. Copy the test from the old FDW, but comment out the bit working with arrays, as it has not yet been updated.
1 parent 419fbd0 commit c5abb2c

6 files changed

Lines changed: 355 additions & 92 deletions

File tree

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[submodule "clickhouse-cpp"]
22
path = vendor/clickhouse-cpp
3-
url = https://github.com/ClickHouse/clickhouse-cpp.git
3+
url = https://github.com/theory/clickhouse-cpp.git

src/binary.cpp

Lines changed: 39 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ using namespace clickhouse;
4747
#define HOST_TO_BIG_ENDIAN_64(x) htobe64(x)
4848
#endif
4949

50+
#define GET_INSERTER(state) (Client::PreparedInsert *)state->inserter
51+
5052
/* palloc which will throw exceptions */
5153
static void * exc_palloc(Size size)
5254
{
@@ -285,98 +287,54 @@ static Oid get_corr_postgres_type(const TypeRef & type)
285287
void ch_binary_insert_state_free(void * c)
286288
{
287289
auto * state = (ch_binary_insert_state *)c;
288-
if (state->columns)
290+
if (state->inserter)
289291
{
290-
/* try to send empty block that sets proper ClickHouse state */
292+
/* Finish the insert to set the proper ClickHouse state */
291293
if (!state->success)
292294
{
295+
auto inserter = GET_INSERTER(state);
293296
try
294297
{
295-
Client * client = (Client *)state->conn->client;
296-
client->Insert(state->table_name, Block());
298+
inserter->Finish();
297299
}
298300
catch (const std::exception & e)
299301
{
300302
// just ignore, next query will fail
301-
elog(NOTICE, "clickhouse_fdw: could not send empty packet");
303+
elog(NOTICE, "clickhouse_fdw: could not finish INSERT: - %s", e.what());
302304
}
303305
}
304-
305-
delete (std::vector<clickhouse::ColumnRef> *)state->columns;
306+
delete (Client::PreparedInsert *)state->inserter;
306307
}
307308
}
308309

309310
void ch_binary_prepare_insert(void * conn, char * query, ch_binary_insert_state * state)
310311
{
311-
throw std::runtime_error("clickhouse_fdw: XXX ch_binary_prepare_insert not implemented");
312-
313-
// std::vector<clickhouse::ColumnRef> * vec = nullptr;
314-
// Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
315-
316-
// try
317-
// {
318-
// client->PrepareInsert(
319-
// std::string(query) + " VALUES", [&state, &vec](const Block & sample_block) {
320-
// if (sample_block.GetColumnCount() == 0)
321-
// return true;
322-
323-
// vec = new std::vector<clickhouse::ColumnRef>();
324-
325-
// state->len = sample_block.GetColumnCount();
326-
327-
// #if PG_VERSION_NUM < 120000
328-
// state->outdesc = CreateTemplateTupleDesc(state->len, false);
329-
// #else
330-
// state->outdesc = CreateTemplateTupleDesc(state->len);
331-
// #endif
332-
333-
// for (size_t i = 0; i < state->len; i++)
334-
// {
335-
// bool error = false;
336-
// clickhouse::ColumnRef col = sample_block[i];
337-
338-
// auto chtype = col->Type();
339-
// if (chtype->GetCode() == Type::LowCardinality)
340-
// {
341-
// chtype = col->As<ColumnLowCardinality>()->GetNestedType();
342-
// }
343-
344-
// Oid pg_type = get_corr_postgres_type(chtype);
345-
346-
// vec->push_back(clickhouse::CreateColumnByType(col->Type()->GetName()));
347-
// const char * colname = sample_block.GetColumnName(i).c_str();
348-
349-
// /* we can't afford long jumps outside of this function */
350-
// PG_TRY();
351-
// {
352-
// TupleDescInitEntry(
353-
// state->outdesc, (AttrNumber)i + 1, colname, pg_type, -1, 0);
354-
// }
355-
// PG_CATCH();
356-
// {
357-
// error = true;
358-
// }
359-
// PG_END_TRY();
360-
361-
// if (error)
362-
// throw std::runtime_error("could not init tuple descriptor");
363-
// }
364-
365-
// return true;
366-
// });
367-
// }
368-
// catch (const std::exception & e)
369-
// {
370-
// client->ResetConnection();
371-
372-
// if (vec != nullptr)
373-
// delete vec;
374-
375-
// elog(ERROR, "clickhouse_fdw: error while insert preparation - %s", e.what());
376-
// }
377-
378-
// if (vec != nullptr)
379-
// state->columns = (void *)vec;
312+
Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
313+
auto inserter = client->PrepareInsert(std::string(query) + " VALUES");
314+
auto block = inserter->GetBlock();
315+
state->len = block->GetColumnCount();
316+
if (state->len == 0) return;
317+
state->outdesc = CreateTemplateTupleDesc(state->len);
318+
319+
AttrNumber i = 0;
320+
for (Block::Iterator bi(*block); bi.IsValid(); bi.Next())
321+
{
322+
Oid pg_type = get_corr_postgres_type(bi.Type());
323+
const char * colname = bi.Name().c_str();
324+
325+
PG_TRY();
326+
{
327+
TupleDescInitEntry(state->outdesc, ++i, colname, pg_type, -1, 0);
328+
}
329+
PG_CATCH();
330+
{
331+
client->ResetConnection();
332+
delete inserter;
333+
}
334+
PG_END_TRY();
335+
}
336+
337+
state->inserter = (ch_prepared_insert_h *) inserter;
380338
}
381339

382340
static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, bool isnull)
@@ -573,7 +531,7 @@ static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, boo
573531
{
574532
case Type::Array: {
575533
// XXX Figure out proper value to create and pass to
576-
// Append.
534+
// Append. Then uncomment the test in binary_inserts.sql.
577535
throw std::runtime_error(
578536
"clickhouse_fdw: XXX unsupported column type "
579537
+ col->Type()->GetName()
@@ -607,8 +565,8 @@ void ch_binary_column_append_data(ch_binary_insert_state * state, size_t colidx)
607565
{
608566
try
609567
{
610-
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
611-
auto col = columns[colidx];
568+
auto inserter = GET_INSERTER(state);
569+
auto col = (*inserter->GetBlock())[colidx];
612570

613571
Datum val = state->values[colidx];
614572
Oid valtype = TupleDescAttr(state->outdesc, colidx)->atttypid;
@@ -626,16 +584,8 @@ void ch_binary_insert_columns(ch_binary_insert_state * state)
626584
{
627585
try
628586
{
629-
Block block;
630-
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
631-
for (int i = 0; i < state->outdesc->natts; ++i)
632-
{
633-
Form_pg_attribute att = TupleDescAttr(state->outdesc, i);
634-
block.AppendColumn(NameStr(att->attname), columns[i]);
635-
}
636-
637-
Client * client = (Client *)state->conn->client;
638-
client->Insert(state->table_name, block);
587+
auto inserter = GET_INSERTER(state);
588+
inserter->Finish();
639589
}
640590
catch (const std::exception & e)
641591
{

src/include/binary.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ extern "C" {
66
#endif
77

88
typedef struct ch_binary_connection_t ch_binary_connection_t;
9+
typedef struct ch_prepared_insert_h ch_prepared_insert_h;
910
typedef struct ch_binary_response_t
1011
{
1112
void *values;
@@ -48,7 +49,7 @@ typedef struct {
4849
MemoryContextCallback callback;
4950

5051
TupleDesc outdesc;
51-
void *columns; /* std::vector */
52+
ch_prepared_insert_h *inserter; /* Client::PreparedInsert */
5253
size_t len;
5354
void *conversion_states;
5455
char *table_name;

0 commit comments

Comments
 (0)