Skip to content

Commit 8cf5f46

Browse files
Sasha Krassovskytristan957
Sasha Krassovsky
authored andcommitted
Add custom xlogreader callbacks to walsender (v16) (#406)
* Add hooks for custom xlogreader in walsender * Make WalSndWaitForWal not static * Switch to tab
1 parent 082d3b9 commit 8cf5f46

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

src/backend/replication/walsender.c

+11-6
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
126126
* data message */
127127
bool log_replication_commands = false;
128128

129+
void (*WalSender_Custom_XLogReaderRoutines)(XLogReaderRoutine *xlr);
129130
/*
130131
* State for WalSndWakeupRequest
131132
*/
@@ -258,7 +259,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
258259
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
259260
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
260261
bool skipped_xact);
261-
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
262+
XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
262263
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
263264

264265
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
@@ -1260,6 +1261,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
12601261
{
12611262
StringInfoData buf;
12621263
QueryCompletion qc;
1264+
XLogReaderRoutine xlr;
12631265

12641266
/* make sure that our requirements are still fulfilled */
12651267
CheckLogicalDecodingRequirements();
@@ -1280,6 +1282,12 @@ StartLogicalReplication(StartReplicationCmd *cmd)
12801282
got_STOPPING = true;
12811283
}
12821284

1285+
xlr.page_read = logical_read_xlog_page;
1286+
xlr.segment_open = WalSndSegmentOpen;
1287+
xlr.segment_close = wal_segment_close;
1288+
if (WalSender_Custom_XLogReaderRoutines != NULL)
1289+
WalSender_Custom_XLogReaderRoutines(&xlr);
1290+
12831291
/*
12841292
* Create our decoding context, making it start at the previously ack'ed
12851293
* position.
@@ -1288,10 +1296,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
12881296
* are reported early.
12891297
*/
12901298
logical_decoding_ctx =
1291-
CreateDecodingContext(cmd->startpoint, cmd->options, false,
1292-
XL_ROUTINE(.page_read = logical_read_xlog_page,
1293-
.segment_open = WalSndSegmentOpen,
1294-
.segment_close = wal_segment_close),
1299+
CreateDecodingContext(cmd->startpoint, cmd->options, false, &xlr,
12951300
WalSndPrepareWrite, WalSndWriteData,
12961301
WalSndUpdateProgress);
12971302
xlogreader = logical_decoding_ctx->reader;
@@ -1539,7 +1544,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
15391544
* if we detect a shutdown request (either from postmaster or client)
15401545
* we will return early, so caller must always check.
15411546
*/
1542-
static XLogRecPtr
1547+
XLogRecPtr
15431548
WalSndWaitForWal(XLogRecPtr loc)
15441549
{
15451550
int wakeEvents;

src/include/replication/walsender.h

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ extern PGDLLIMPORT int max_wal_senders;
3636
extern PGDLLIMPORT int wal_sender_timeout;
3737
extern PGDLLIMPORT bool log_replication_commands;
3838

39+
struct XLogReaderRoutine;
40+
extern PGDLLIMPORT void (*WalSender_Custom_XLogReaderRoutines)(struct XLogReaderRoutine *xlr);
41+
3942
extern void InitWalSender(void);
4043
extern bool exec_replication_command(const char *cmd_string);
4144
extern void WalSndErrorCleanup(void);

0 commit comments

Comments
 (0)