Skip to content

Commit 1dbbe2d

Browse files
committed
track connection state
1 parent d4477c7 commit 1dbbe2d

5 files changed

Lines changed: 76 additions & 3 deletions

File tree

src/binary.c

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,14 @@ struct ch_binary_state
316316

317317
/* Per-query cancel callback (no userdata). */
318318
bool (*check_cancel_fn) (void);
319+
320+
/*
321+
* Set when an unrecoverable protocol/IO error happened (eg server
322+
* raised an exception mid-INSERT and closed the socket, or a write
323+
* hit EPIPE). Callers must drop the cached connection rather than
324+
* reuse it; the cache layer checks this via ch_binary_is_broken.
325+
*/
326+
bool broken;
319327
};
320328

321329
/* ch_binary_connection_t is defined in internal.h with three fields; we
@@ -515,6 +523,16 @@ ch_binary_connect(ch_connection_details * details, char *errbuf, size_t errbuf_s
515523
return NULL;
516524
}
517525

526+
bool
527+
ch_binary_is_broken(const ch_binary_connection_t * conn)
528+
{
529+
if (!conn)
530+
return false;
531+
const struct ch_binary_state *s = (const struct ch_binary_state *) conn->client;
532+
533+
return s ? s->broken : false;
534+
}
535+
518536
void
519537
ch_binary_close(ch_binary_connection_t * conn)
520538
{
@@ -808,6 +826,7 @@ ch_binary_simple_query(ch_binary_connection_t * conn, const ch_query * query,
808826
if (rc != CHC_OK)
809827
{
810828
resp_set_error(resp, err.msg[0] ? err.msg : "send_query failed");
829+
s->broken = true;
811830
goto done;
812831
}
813832

@@ -823,6 +842,7 @@ ch_binary_simple_query(ch_binary_connection_t * conn, const ch_query * query,
823842
if (rc != CHC_OK)
824843
{
825844
resp_set_error(resp, err.msg[0] ? err.msg : "recv_packet failed");
845+
s->broken = true;
826846
break;
827847
}
828848
if (check_cancel && check_cancel() && !resp->error)
@@ -854,6 +874,14 @@ ch_binary_simple_query(ch_binary_connection_t * conn, const ch_query * query,
854874
case CHC_PKT_EXCEPTION:
855875
resp_set_exception(resp, pkt.exception);
856876
chc_packet_clear(s->client, &pkt);
877+
/*
878+
* Older servers (and some protocol states) close the socket
879+
* after raising an exception, so reusing this connection for
880+
* a follow-up query risks EPIPE. Match the legacy C++ driver
881+
* (which always called Client::ResetConnection) and treat
882+
* the connection as broken.
883+
*/
884+
s->broken = true;
857885
goto done;
858886
case CHC_PKT_END_OF_STREAM:
859887
chc_packet_clear(s->client, &pkt);
@@ -1025,6 +1053,8 @@ struct ch_binary_insert_handle
10251053
{
10261054
chc_client *client;
10271055
chc_alloc alloc;
1056+
struct ch_binary_state *state; /* parent connection; used to flag
1057+
* broken state on error */
10281058
chc_block *initial_block; /* schema source (server's empty Data) */
10291059
chc_query_opts opts;
10301060
size_t ncols;
@@ -1197,6 +1227,7 @@ recv_initial_block(struct ch_binary_state *s, ch_binary_insert_handle * h,
11971227
{
11981228
set_errbuf(errbuf, errbuf_size,
11991229
err.msg[0] ? err.msg : "recv_packet failed");
1230+
s->broken = true;
12001231
return -1;
12011232
}
12021233
if (pkt.kind == CHC_PKT_EXCEPTION)
@@ -1208,6 +1239,7 @@ recv_initial_block(struct ch_binary_state *s, ch_binary_insert_handle * h,
12081239
else if (pkt.exception && pkt.exception->name)
12091240
msg = pkt.exception->name;
12101241
set_errbuf(errbuf, errbuf_size, msg);
1242+
s->broken = true;
12111243
chc_packet_clear(s->client, &pkt);
12121244
return -1;
12131245
}
@@ -1238,6 +1270,7 @@ ch_binary_begin_insert(ch_binary_connection_t * conn, const ch_query * query,
12381270
}
12391271
h->client = s->client;
12401272
h->alloc = s->alloc;
1273+
h->state = s;
12411274
arena_init(&h->info_arena);
12421275

12431276
/* Append " VALUES" so the server enters insert mode. */
@@ -1284,6 +1317,7 @@ ch_binary_begin_insert(ch_binary_connection_t * conn, const ch_query * query,
12841317
if (rc != CHC_OK)
12851318
{
12861319
set_errbuf(errbuf, errbuf_size, err.msg[0] ? err.msg : "send_query failed");
1320+
s->broken = true;
12871321
ch_binary_end_insert(h, errbuf, errbuf_size);
12881322
return NULL;
12891323
}
@@ -2219,6 +2253,8 @@ ch_binary_flush_block(ch_binary_insert_handle * h, char *errbuf, size_t errbuf_s
22192253
if (rc != CHC_OK)
22202254
{
22212255
set_errbuf(errbuf, errbuf_size, err.msg[0] ? err.msg : "send_data");
2256+
if (h->state)
2257+
h->state->broken = true;
22222258
goto cleanup;
22232259
}
22242260
ret = 0;
@@ -2271,6 +2307,8 @@ ch_binary_end_insert(ch_binary_insert_handle * h, char *errbuf, size_t errbuf_si
22712307
{
22722308
set_errbuf(errbuf, errbuf_size,
22732309
err.msg[0] ? err.msg : "recv_packet failed");
2310+
if (h->state)
2311+
h->state->broken = true;
22742312
chc_packet_clear(h->client, &pkt);
22752313
break;
22762314
}
@@ -2283,6 +2321,13 @@ ch_binary_end_insert(ch_binary_insert_handle * h, char *errbuf, size_t errbuf_si
22832321
else if (pkt.exception && pkt.exception->name)
22842322
msg = pkt.exception->name;
22852323
set_errbuf(errbuf, errbuf_size, msg);
2324+
/*
2325+
* Server raised an exception mid-INSERT and typically
2326+
* closes the socket; the next op on this conn would
2327+
* EPIPE. Mark broken so the cache rebuilds.
2328+
*/
2329+
if (h->state)
2330+
h->state->broken = true;
22862331
chc_packet_clear(h->client, &pkt);
22872332
break;
22882333
}
@@ -2294,6 +2339,8 @@ ch_binary_end_insert(ch_binary_insert_handle * h, char *errbuf, size_t errbuf_si
22942339
else
22952340
{
22962341
set_errbuf(errbuf, errbuf_size, err.msg[0] ? err.msg : "send_data terminator failed");
2342+
if (h->state)
2343+
h->state->broken = true;
22972344
}
22982345
}
22992346
if (h->initial_block)

src/connection.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,19 @@ chfdw_get_connection(UserMapping * user)
123123
}
124124

125125
/*
126-
* We don't check the health of cached connection here, because it would
127-
* require some overhead. Broken connection will be detected when the
128-
* connection is actually used.
126+
* Drop connections that hit an unrecoverable protocol/IO error on the
127+
* previous statement (server raised mid-INSERT and closed the socket,
128+
* write hit EPIPE, etc). Without this, a subsequent statement would
129+
* write to the dead socket and surface a useless "Broken pipe" instead
130+
* of the real error from the next request.
129131
*/
132+
if (entry->gate.conn != NULL && entry->gate.methods->is_broken != NULL
133+
&& entry->gate.methods->is_broken(entry->gate.conn))
134+
{
135+
elog(LOG, "closing broken pg_clickhouse connection");
136+
entry->gate.methods->disconnect(entry->gate.conn);
137+
entry->gate.conn = NULL;
138+
}
130139

131140
/*
132141
* If cache entry doesn't have a connection, we have to establish a new

src/include/binary.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ extern ch_binary_connection_t * ch_binary_connect(ch_connection_details * detail
4141
char *errbuf, size_t errbuf_size);
4242
extern void ch_binary_close(ch_binary_connection_t * conn);
4343

44+
/*
45+
* Returns true if the connection encountered an unrecoverable error
46+
* (server exception, IO failure, mid-protocol break). Callers should
47+
* drop the cached connection instead of reusing it.
48+
*/
49+
extern bool ch_binary_is_broken(const ch_binary_connection_t * conn);
50+
4451
/* SELECT. */
4552
extern ch_binary_response_t * ch_binary_simple_query(ch_binary_connection_t * conn,
4653
const ch_query * query,

src/include/fdw.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ typedef void (*insert_tuple_method) (void *state, TupleTableSlot * slot);
7878
typedef ch_cursor * (*streaming_query_method) (void *conn,
7979
const ch_query * query,
8080
int32 fetch_size);
81+
typedef bool (*is_broken_method) (const void *conn);
8182

8283
typedef struct
8384
{
@@ -88,6 +89,7 @@ typedef struct
8889
insert_tuple_method insert_tuple;
8990
streaming_query_method streaming_query; /* NULL if not supported */
9091
cursor_fetch_row_method streaming_fetch_row; /* NULL if not supported */
92+
is_broken_method is_broken; /* NULL means connection is never broken */
9193
} libclickhouse_methods;
9294

9395
typedef struct

src/pglink.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ static libclickhouse_methods http_methods =
5858
static void binary_disconnect(void *conn);
5959
static ch_cursor * binary_simple_query(void *conn, const ch_query * query);
6060
static void binary_cursor_free(void *cursor);
61+
static bool binary_is_broken(const void *conn);
6162

6263
/* static void binary_simple_insert(void *conn, const char *query); */
6364
static Datum * binary_fetch_row(ChFdwScanRowContext * ctx);
@@ -78,6 +79,7 @@ static libclickhouse_methods binary_methods =
7879
.insert_tuple = binary_insert_tuple,
7980
.streaming_query = NULL,
8081
.streaming_fetch_row = NULL,
82+
.is_broken = binary_is_broken,
8183
};
8284

8385
static int
@@ -841,6 +843,12 @@ binary_disconnect(void *conn)
841843
ch_binary_close((ch_binary_connection_t *) conn);
842844
}
843845

846+
static bool
847+
binary_is_broken(const void *conn)
848+
{
849+
return ch_binary_is_broken((const ch_binary_connection_t *) conn);
850+
}
851+
844852
static ch_cursor *
845853
binary_simple_query(void *conn, const ch_query * query)
846854
{

0 commit comments

Comments
 (0)