@@ -3,7 +3,8 @@ package pgoutput
33import (
44 "context"
55 "fmt"
6- "log"
6+ "sync"
7+ "sync/atomic"
78 "time"
89
910 "github.com/jackc/pgx"
@@ -14,120 +15,190 @@ type Subscription struct {
1415 Publication string
1516 WaitTimeout time.Duration
1617 StatusTimeout time.Duration
17- CopyData bool
18+
19+ conn * pgx.ReplicationConn
20+ maxWal uint64
21+ walRetain uint64
22+ walFlushed uint64
23+
24+ failOnHandler bool
25+
26+ // Mutex is used to prevent reading and writing to a connection at the same time
27+ sync.Mutex
1828}
1929
20- type Handler func (Message ) error
30+ type Handler func (Message , uint64 ) error
2131
22- func NewSubscription (name , publication string ) * Subscription {
32+ func NewSubscription (conn * pgx. ReplicationConn , name , publication string , walRetain uint64 , failOnHandler bool ) * Subscription {
2333 return & Subscription {
2434 Name : name ,
2535 Publication : publication ,
26- WaitTimeout : time .Second * 10 ,
27- StatusTimeout : time .Second * 10 ,
28- CopyData : true ,
36+ WaitTimeout : 1 * time .Second ,
37+ StatusTimeout : 10 * time .Second ,
38+
39+ conn : conn ,
40+ walRetain : walRetain ,
41+ failOnHandler : failOnHandler ,
2942 }
3043}
3144
3245func pluginArgs (version , publication string ) string {
33- return fmt .Sprintf (`( "proto_version" '%s', "publication_names" '%s') ` , version , publication )
46+ return fmt .Sprintf (`"proto_version" '%s', "publication_names" '%s'` , version , publication )
3447}
3548
36- func (s * Subscription ) Start (ctx context.Context , conn * pgx.ReplicationConn , h Handler ) error {
37- // TODO: Struct Validation here
38- _ = conn .DropReplicationSlot (s .Name )
39-
49+ // CreateSlot creates a replication slot if it doesn't exist
50+ func (s * Subscription ) CreateSlot () (err error ) {
4051 // If creating the replication slot fails with code 42710, this means
4152 // the replication slot already exists.
42- err := conn .CreateReplicationSlot (s .Name , "pgoutput" )
43- if err != nil {
53+ if err = s .conn .CreateReplicationSlot (s .Name , "pgoutput" ); err != nil {
4454 pgerr , ok := err .(pgx.PgError )
45- if ! ok {
46- return fmt .Errorf ("failed to create replication slot: %s" , err )
47- }
48- if pgerr .Code != "42710" {
49- return fmt .Errorf ("failed to create replication slot: %s" , err )
55+ if ! ok || pgerr .Code != "42710" {
56+ return
5057 }
58+
59+ err = nil
60+ }
61+
62+ return
63+ }
64+
65+ func (s * Subscription ) sendStatus (walWrite , walFlush uint64 ) error {
66+ if walFlush > walWrite {
67+ return fmt .Errorf ("walWrite should be >= walFlush" )
5168 }
5269
53- // rows, err := conn.IdentifySystem()
54- // if err != nil {
55- // return err
56- // }
70+ s .Lock ()
71+ defer s .Unlock ()
5772
58- // var slotName, consitentPoint, snapshotName, outputPlugin string
59- // if err := row.Scan(&slotName, &consitentPoint, &snapshotName, &outputPlugin); err != nil {
60- // return err
61- // }
73+ k , err := pgx . NewStandbyStatus ( walFlush , walFlush , walWrite )
74+ if err != nil {
75+ return fmt . Errorf ( "error creating status: %s" , err )
76+ }
6277
63- // log.Printf("slotName: %s\n", slotName)
64- // log.Printf("consitentPoint: %s\n", consitentPoint)
65- // log.Printf("snapshotName: %s\n", snapshotName)
66- // log.Printf("outputPlugin: %s\n", outputPlugin)
78+ if err = s .conn .SendStandbyStatus (k ); err != nil {
79+ return err
80+ }
6781
68- // Open a transaction on the server
69- // SET TRANSACTION SNAPSHOT id
70- // read all the data from the tables
82+ return nil
83+ }
7184
72- err = conn .StartReplication (s .Name , 0 , - 1 , pluginArgs ("1" , s .Publication ))
85+ // Flush sends the status message to server indicating that we've fully applied all of the events until maxWal.
86+ // This allows PostgreSQL to purge it's WAL logs
87+ func (s * Subscription ) Flush () error {
88+ wp := atomic .LoadUint64 (& s .maxWal )
89+ err := s .sendStatus (wp , wp )
90+ if err == nil {
91+ atomic .StoreUint64 (& s .walFlushed , wp )
92+ }
93+
94+ return err
95+ }
96+
97+ // Start replication and block until error or ctx is canceled
98+ func (s * Subscription ) Start (ctx context.Context , startLSN uint64 , h Handler ) (err error ) {
99+ err = s .conn .StartReplication (s .Name , startLSN , - 1 , pluginArgs ("1" , s .Publication ))
73100 if err != nil {
74101 return fmt .Errorf ("failed to start replication: %s" , err )
75102 }
76103
77- var maxWal uint64
104+ s . maxWal = startLSN
78105
79106 sendStatus := func () error {
80- k , err := pgx .NewStandbyStatus (maxWal )
81- if err != nil {
82- return fmt .Errorf ("error creating standby status: %s" , err )
107+ walPos := atomic .LoadUint64 (& s .maxWal )
108+ walLastFlushed := atomic .LoadUint64 (& s .walFlushed )
109+
110+ // Confirm only walRetain bytes in past
111+ // If walRetain is zero - will confirm current walPos as flushed
112+ walFlush := walPos - s .walRetain
113+
114+ if walLastFlushed > walFlush {
115+ // If there was a manual flush - report it's position until we're past it
116+ walFlush = walLastFlushed
117+ } else if walFlush < 0 {
118+ // If we have less than walRetain bytes - just report zero
119+ walFlush = 0
83120 }
84- if err := conn .SendStandbyStatus (k ); err != nil {
85- return fmt .Errorf ("failed to send standy status: %s" , err )
86- }
87- return nil
121+
122+ return s .sendStatus (walPos , walFlush )
88123 }
89124
90- tick := time .NewTicker (s .StatusTimeout ).C
125+ go func () {
126+ tick := time .NewTicker (s .StatusTimeout )
127+ defer tick .Stop ()
128+
129+ for {
130+ select {
131+ case <- tick .C :
132+ if err = sendStatus (); err != nil {
133+ return
134+ }
135+
136+ case <- ctx .Done ():
137+ return
138+ }
139+ }
140+ }()
141+
91142 for {
92143 select {
93- case <- tick :
94- log .Println ("pub status" )
95- if maxWal == 0 {
96- continue
97- }
98- if err := sendStatus (); err != nil {
99- return err
144+ case <- ctx .Done ():
145+ // Send final status and exit
146+ if err = sendStatus (); err != nil {
147+ return fmt .Errorf ("Unable to send final status: %s" , err )
100148 }
149+
150+ return
151+
101152 default :
102153 var message * pgx.ReplicationMessage
103154 wctx , cancel := context .WithTimeout (ctx , s .WaitTimeout )
104- message , err = conn .WaitForReplicationMessage (wctx )
155+ s .Lock ()
156+ message , err = s .conn .WaitForReplicationMessage (wctx )
157+ s .Unlock ()
105158 cancel ()
159+
106160 if err == context .DeadlineExceeded {
107161 continue
108- }
109- if err != nil {
162+ } else if err == context .Canceled {
163+ return
164+ } else if err != nil {
110165 return fmt .Errorf ("replication failed: %s" , err )
111166 }
167+
168+ if message == nil {
169+ return fmt .Errorf ("replication failed: nil message received, should not happen" )
170+ }
171+
112172 if message .WalMessage != nil {
113- if message .WalMessage .WalStart > maxWal {
114- maxWal = message .WalMessage .WalStart
173+ var logmsg Message
174+ walStart := message .WalMessage .WalStart
175+
176+ // Skip stuff that's in the past
177+ if walStart > 0 && walStart <= startLSN {
178+ continue
115179 }
116- logmsg , err := Parse (message .WalMessage .WalData )
180+
181+ if walStart > atomic .LoadUint64 (& s .maxWal ) {
182+ atomic .StoreUint64 (& s .maxWal , walStart )
183+ }
184+
185+ logmsg , err = Parse (message .WalMessage .WalData )
117186 if err != nil {
118187 return fmt .Errorf ("invalid pgoutput message: %s" , err )
119188 }
120- if err := h (logmsg ); err != nil {
121- return fmt .Errorf ("error handling waldata: %s" , err )
189+
190+ // Ignore the error from handler for now
191+ if err = h (logmsg , walStart ); err != nil && s .failOnHandler {
192+ return
122193 }
123- }
124- if message .ServerHeartbeat != nil {
194+ } else if message .ServerHeartbeat != nil {
125195 if message .ServerHeartbeat .ReplyRequested == 1 {
126- log .Println ("server wants a reply" )
127- if err := sendStatus (); err != nil {
128- return err
196+ if err = sendStatus (); err != nil {
197+ return
129198 }
130199 }
200+ } else {
201+ return fmt .Errorf ("No WalMessage/ServerHeartbeat defined in packet, should not happen" )
131202 }
132203 }
133204 }
0 commit comments