Skip to content

Commit 24a059a

Browse files
committed
Add examples and a new help function
1 parent 48b7858 commit 24a059a

File tree

3 files changed

+185
-24
lines changed

3 files changed

+185
-24
lines changed

README.md

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,52 +5,69 @@ package main
55

66
import (
77
"context"
8+
"fmt"
89
"log"
10+
"time"
911

10-
"github.com/kyleconroy/pgoutput"
1112
"github.com/jackc/pgx"
13+
"github.com/kyleconroy/pgoutput"
1214
)
1315

1416
func main() {
1517
ctx := context.Background()
16-
config := pgx.ConnConfig{Database: "db", User: "replicant"}
18+
config := pgx.ConnConfig{Database: "opsdash", User: "replicant"}
1719
conn, err := pgx.ReplicationConnect(config)
1820
if err != nil {
1921
log.Fatal(err)
2022
}
2123

22-
err = conn.StartReplication("sub1", 0, -1, `("proto_version" '1', "publication_names" 'pub1')`)
23-
if err != nil {
24-
log.Fatalf("Failed to start replication: %v", err)
25-
}
24+
// Create a slot if it doesn't already exist
25+
// if err := conn.CreateReplicationSlot("sub2", "pgoutput"); err != nil {
26+
// log.Fatalf("Failed to create replication slot: %v", err)
27+
// }
2628

2729
set := pgoutput.NewRelationSet()
28-
for {
29-
var message *pgx.ReplicationMessage
30-
message, err = conn.WaitForReplicationMessage(ctx)
30+
31+
dump := func(relation uint32, row []pgoutput.Tuple) error {
32+
values, err := set.Values(relation, row)
3133
if err != nil {
32-
log.Fatalf("Replication failed: %v %s", err)
33-
}
34-
if message.WalMessage == nil {
35-
continue
34+
return fmt.Errorf("error parsing values: %s", err)
3635
}
37-
m, err := pgoutput.Parse(message.WalMessage.WalData)
38-
if err != nil {
39-
log.Fatalf("error parsing waldata: %s", err)
36+
for name, value := range values {
37+
val := value.Get()
38+
log.Printf("%s (%T): %#v", name, val, val)
4039
}
40+
return nil
41+
}
42+
43+
handler := func(m pgoutput.Message) error {
4144
switch v := m.(type) {
4245
case pgoutput.Relation:
46+
log.Printf("RELATION")
4347
set.Add(v)
4448
case pgoutput.Insert:
45-
values, err := set.Values(v.RelationID, v.Row)
46-
if err != nil {
47-
log.Fatalf("error parsing values: %s", err)
48-
}
49-
for name, value := range values {
50-
val := value.Get()
51-
log.Printf("%s (%T): %#v", name, val, val)
52-
}
49+
log.Printf("INSERT")
50+
return dump(v.RelationID, v.Row)
51+
case pgoutput.Update:
52+
log.Printf("UPDATE")
53+
return dump(v.RelationID, v.Row)
54+
case pgoutput.Delete:
55+
log.Printf("DELETE")
56+
return dump(v.RelationID, v.Row)
5357
}
58+
return nil
59+
}
60+
61+
replication := pgoutput.LogicalReplication{
62+
Subscription: "sub2",
63+
Publication: "pub2",
64+
WaitTimeout: time.Second * 10,
65+
StatusTimeout: time.Second * 10,
66+
Handler: handler,
67+
}
68+
69+
if err := replication.Start(ctx, conn); err != nil {
70+
log.Fatal(err)
5471
}
5572
}
5673
```

examples/replicate.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/jackc/pgx"
10+
"github.com/kyleconroy/pgoutput"
11+
)
12+
13+
func main() {
14+
ctx := context.Background()
15+
config := pgx.ConnConfig{Database: "opsdash", User: "replicant"}
16+
conn, err := pgx.ReplicationConnect(config)
17+
if err != nil {
18+
log.Fatal(err)
19+
}
20+
21+
// if err := conn.CreateReplicationSlot("sub2", "pgoutput"); err != nil {
22+
// log.Fatalf("Failed to create replication slot: %v", err)
23+
// }
24+
25+
set := pgoutput.NewRelationSet()
26+
27+
dump := func(relation uint32, row []pgoutput.Tuple) error {
28+
values, err := set.Values(relation, row)
29+
if err != nil {
30+
return fmt.Errorf("error parsing values: %s", err)
31+
}
32+
for name, value := range values {
33+
val := value.Get()
34+
log.Printf("%s (%T): %#v", name, val, val)
35+
}
36+
return nil
37+
}
38+
39+
handler := func(m pgoutput.Message) error {
40+
switch v := m.(type) {
41+
case pgoutput.Relation:
42+
log.Printf("RELATION")
43+
set.Add(v)
44+
case pgoutput.Insert:
45+
log.Printf("INSERT")
46+
return dump(v.RelationID, v.Row)
47+
case pgoutput.Update:
48+
log.Printf("UPDATE")
49+
return dump(v.RelationID, v.Row)
50+
case pgoutput.Delete:
51+
log.Printf("DELETE")
52+
return dump(v.RelationID, v.Row)
53+
}
54+
return nil
55+
}
56+
57+
replication := pgoutput.LogicalReplication{
58+
Subscription: "sub2",
59+
Publication: "pub2",
60+
WaitTimeout: time.Second * 10,
61+
StatusTimeout: time.Second * 10,
62+
Handler: handler,
63+
}
64+
65+
// Create slot
66+
67+
if err := replication.Start(ctx, conn); err != nil {
68+
log.Fatal(err)
69+
}
70+
}

replicate.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package pgoutput
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/jackc/pgx"
10+
)
11+
12+
type LogicalReplication struct {
13+
Subscription string
14+
Publication string
15+
WaitTimeout time.Duration
16+
StatusTimeout time.Duration
17+
Handler func(Message) error
18+
}
19+
20+
func pluginArgs(version, publication string) string {
21+
return fmt.Sprintf(`("proto_version" '%s', "publication_names" '%s')`, version, publication)
22+
}
23+
24+
func (lr *LogicalReplication) Start(ctx context.Context, conn *pgx.ReplicationConn) error {
25+
// TODO: Struct Validation here
26+
err := conn.StartReplication(lr.Subscription, 0, -1, pluginArgs("1", lr.Publication))
27+
if err != nil {
28+
return fmt.Errorf("Failed to start replication: %v", err)
29+
}
30+
31+
var maxWal uint64
32+
tick := time.NewTicker(lr.StatusTimeout).C
33+
for {
34+
select {
35+
case <-tick:
36+
log.Println("pub status")
37+
if maxWal == 0 {
38+
continue
39+
}
40+
k, err := pgx.NewStandbyStatus(maxWal)
41+
if err != nil {
42+
return fmt.Errorf("Create new status failed: %s", err)
43+
}
44+
if err := conn.SendStandbyStatus(k); err != nil {
45+
return fmt.Errorf("Sending standy status failed: %s", err)
46+
}
47+
default:
48+
var message *pgx.ReplicationMessage
49+
wctx, cancel := context.WithTimeout(ctx, lr.WaitTimeout)
50+
message, err = conn.WaitForReplicationMessage(wctx)
51+
cancel()
52+
if err == context.DeadlineExceeded {
53+
continue
54+
}
55+
if err != nil {
56+
return fmt.Errorf("Replication failed: %s", err)
57+
}
58+
if message.WalMessage == nil {
59+
log.Println("nil wal message")
60+
continue
61+
}
62+
if message.WalMessage.WalStart > maxWal {
63+
maxWal = message.WalMessage.WalStart
64+
}
65+
logmsg, err := Parse(message.WalMessage.WalData)
66+
if err != nil {
67+
return fmt.Errorf("invalid pgoutput message: %s", err)
68+
}
69+
if err := lr.Handler(logmsg); err != nil {
70+
return fmt.Errorf("error handling waldata: %s", err)
71+
}
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)