Skip to content

Commit 39448d5

Browse files
Sasha Krassovskytristan957
Sasha Krassovsky
authored andcommitted
Add custom xlogreader callbacks to walsender (v14) (#404)
* Add hooks for custom xlogreader in walsender * Make WalSndWaitForWal not static * Switch space to tab
1 parent f4d9954 commit 39448d5

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

src/backend/replication/walsender.c

+11-6
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124124
* data message */
125125
bool log_replication_commands = false;
126126

127+
void (*WalSender_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);
127128
/*
128129
* State for WalSndWakeupRequest
129130
*/
@@ -255,7 +256,7 @@ static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
255256
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
256257
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
257258
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
258-
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
259+
XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
259260
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
260261

261262
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
@@ -1136,6 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
11361137
{
11371138
StringInfoData buf;
11381139
QueryCompletion qc;
1140+
XLogReaderRoutine xlr;
11391141

11401142
/* make sure that our requirements are still fulfilled */
11411143
CheckLogicalDecodingRequirements();
@@ -1163,6 +1165,12 @@ StartLogicalReplication(StartReplicationCmd *cmd)
11631165
got_STOPPING = true;
11641166
}
11651167

1168+
xlr.page_read = logical_read_xlog_page;
1169+
xlr.segment_open = WalSndSegmentOpen;
1170+
xlr.segment_close = wal_segment_close;
1171+
if (WalSender_Custom_XLogReaderRoutines != NULL)
1172+
WalSender_Custom_XLogReaderRoutines(&xlr);
1173+
11661174
/*
11671175
* Create our decoding context, making it start at the previously ack'ed
11681176
* position.
@@ -1171,10 +1179,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
11711179
* are reported early.
11721180
*/
11731181
logical_decoding_ctx =
1174-
CreateDecodingContext(cmd->startpoint, cmd->options, false,
1175-
XL_ROUTINE(.page_read = logical_read_xlog_page,
1176-
.segment_open = WalSndSegmentOpen,
1177-
.segment_close = wal_segment_close),
1182+
CreateDecodingContext(cmd->startpoint, cmd->options, false, &xlr,
11781183
WalSndPrepareWrite, WalSndWriteData,
11791184
WalSndUpdateProgress);
11801185
xlogreader = logical_decoding_ctx->reader;
@@ -1395,7 +1400,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
13951400
* if we detect a shutdown request (either from postmaster or client)
13961401
* we will return early, so caller must always check.
13971402
*/
1398-
static XLogRecPtr
1403+
XLogRecPtr
13991404
WalSndWaitForWal(XLogRecPtr loc)
14001405
{
14011406
int wakeEvents;

src/include/replication/walsender.h

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ extern int max_wal_senders;
3636
extern int wal_sender_timeout;
3737
extern bool log_replication_commands;
3838

39+
struct XLogReaderRoutine;
40+
extern void (*WalSender_Custom_XLogReaderRoutines)(struct XLogReaderRoutine *xlr);
41+
3942
extern void InitWalSender(void);
4043
extern bool exec_replication_command(const char *query_string);
4144
extern void WalSndErrorCleanup(void);

0 commit comments

Comments
 (0)