@@ -6,37 +6,27 @@ import (
6
6
7
7
ds "github.com/ipfs/go-datastore"
8
8
dsq "github.com/ipfs/go-datastore/query"
9
- "github.com/jackc/pgconn"
10
9
"github.com/jackc/pgx/v4"
11
10
"github.com/jackc/pgx/v4/pgxpool"
12
11
)
13
12
14
13
// Datastore is a PostgreSQL backed datastore.
15
14
type Datastore struct {
16
- table string
17
- connConf * pgx.ConnConfig
18
- pool * pgxpool.Pool
19
- }
20
-
21
- type rowQuerier interface {
22
- QueryRow (ctx context.Context , sql string , args ... interface {}) pgx.Row
23
- }
24
-
25
- type execQuerier interface {
26
- Exec (ctx context.Context , sql string , arguments ... interface {}) (pgconn.CommandTag , error )
15
+ table string
16
+ pool * pgxpool.Pool
27
17
}
28
18
29
19
// NewDatastore creates a new PostgreSQL datastore
30
- func NewDatastore (connString string , options ... Option ) (* Datastore , error ) {
20
+ func NewDatastore (ctx context. Context , connString string , options ... Option ) (* Datastore , error ) {
31
21
cfg := Options {}
32
22
cfg .Apply (append ([]Option {OptionDefaults }, options ... )... )
33
23
34
- connConf , err := pgx . ParseConfig ( connString )
24
+ pool , err := pgxpool . Connect ( ctx , connString )
35
25
if err != nil {
36
26
return nil , err
37
27
}
38
28
39
- return & Datastore {table : cfg .Table , connConf : connConf , pool : cfg . Pool }, nil
29
+ return & Datastore {table : cfg .Table , pool : pool }, nil
40
30
}
41
31
42
32
// Close closes the underying PostgreSQL database.
@@ -55,24 +45,10 @@ func (d *Datastore) Delete(key ds.Key) error {
55
45
// DeleteContext removes a row from the PostgreSQL database by the given key.
56
46
func (d * Datastore ) DeleteContext (ctx context.Context , key ds.Key ) error {
57
47
sql := fmt .Sprintf ("DELETE FROM %s WHERE key = $1" , d .table )
58
-
59
- var eq execQuerier
60
- if d .pool != nil {
61
- eq = d .pool
62
- } else {
63
- c , err := pgx .ConnectConfig (ctx , d .connConf )
64
- if err != nil {
65
- return err
66
- }
67
- defer c .Close (ctx )
68
- eq = c
69
- }
70
-
71
- _ , err := eq .Exec (ctx , sql , key .String ())
48
+ _ , err := d .pool .Exec (ctx , sql , key .String ())
72
49
if err != nil {
73
50
return err
74
51
}
75
-
76
52
return nil
77
53
}
78
54
@@ -84,20 +60,7 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
84
60
// GetContext retrieves a value from the PostgreSQL database by the given key.
85
61
func (d * Datastore ) GetContext (ctx context.Context , key ds.Key ) (value []byte , err error ) {
86
62
sql := fmt .Sprintf ("SELECT data FROM %s WHERE key = $1" , d .table )
87
-
88
- var rq rowQuerier
89
- if d .pool != nil {
90
- rq = d .pool
91
- } else {
92
- c , err := pgx .ConnectConfig (ctx , d .connConf )
93
- if err != nil {
94
- return nil , err
95
- }
96
- defer c .Close (ctx )
97
- rq = c
98
- }
99
-
100
- row := rq .QueryRow (ctx , sql , key .String ())
63
+ row := d .pool .QueryRow (ctx , sql , key .String ())
101
64
var out []byte
102
65
switch err := row .Scan (& out ); err {
103
66
case pgx .ErrNoRows :
@@ -117,20 +80,7 @@ func (d *Datastore) Has(key ds.Key) (bool, error) {
117
80
// HasContext determines if a value for the given key exists in the PostgreSQL database.
118
81
func (d * Datastore ) HasContext (ctx context.Context , key ds.Key ) (bool , error ) {
119
82
sql := fmt .Sprintf ("SELECT exists(SELECT 1 FROM %s WHERE key = $1)" , d .table )
120
-
121
- var rq rowQuerier
122
- if d .pool != nil {
123
- rq = d .pool
124
- } else {
125
- c , err := pgx .ConnectConfig (ctx , d .connConf )
126
- if err != nil {
127
- return false , err
128
- }
129
- defer c .Close (ctx )
130
- rq = c
131
- }
132
-
133
- row := rq .QueryRow (ctx , sql , key .String ())
83
+ row := d .pool .QueryRow (ctx , sql , key .String ())
134
84
var exists bool
135
85
switch err := row .Scan (& exists ); err {
136
86
case pgx .ErrNoRows :
@@ -150,24 +100,10 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
150
100
// PutContext "upserts" a row into the PostgreSQL database.
151
101
func (d * Datastore ) PutContext (ctx context.Context , key ds.Key , value []byte ) error {
152
102
sql := fmt .Sprintf ("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2" , d .table )
153
-
154
- var eq execQuerier
155
- if d .pool != nil {
156
- eq = d .pool
157
- } else {
158
- c , err := pgx .ConnectConfig (ctx , d .connConf )
159
- if err != nil {
160
- return err
161
- }
162
- defer c .Close (ctx )
163
- eq = c
164
- }
165
-
166
- _ , err := eq .Exec (ctx , sql , key .String (), value )
103
+ _ , err := d .pool .Exec (ctx , sql , key .String (), value )
167
104
if err != nil {
168
105
return err
169
106
}
170
-
171
107
return nil
172
108
}
173
109
@@ -205,25 +141,9 @@ func (d *Datastore) QueryContext(ctx context.Context, q dsq.Query) (dsq.Results,
205
141
}
206
142
}
207
143
208
- var conn * pgx.Conn
209
- var rows pgx.Rows
210
- var err error
211
-
212
- if d .pool != nil {
213
- rows , err = d .pool .Query (ctx , sql )
214
- if err != nil {
215
- return nil , err
216
- }
217
- } else {
218
- conn , err = pgx .ConnectConfig (ctx , d .connConf )
219
- if err != nil {
220
- return nil , err
221
- }
222
-
223
- rows , err = conn .Query (ctx , sql )
224
- if err != nil {
225
- return nil , err
226
- }
144
+ rows , err := d .pool .Query (ctx , sql )
145
+ if err != nil {
146
+ return nil , err
227
147
}
228
148
229
149
it := dsq.Iterator {
@@ -265,14 +185,6 @@ func (d *Datastore) QueryContext(ctx context.Context, q dsq.Query) (dsq.Results,
265
185
},
266
186
Close : func () error {
267
187
rows .Close ()
268
-
269
- if d .pool == nil {
270
- err := conn .Close (ctx )
271
- if err != nil {
272
- return err
273
- }
274
- }
275
-
276
188
return nil
277
189
},
278
190
}
@@ -312,20 +224,7 @@ func (d *Datastore) GetSize(key ds.Key) (int, error) {
312
224
// Returns -1 if not found or other error occurs.
313
225
func (d * Datastore ) GetSizeContext (ctx context.Context , key ds.Key ) (int , error ) {
314
226
sql := fmt .Sprintf ("SELECT octet_length(data) FROM %s WHERE key = $1" , d .table )
315
-
316
- var rq rowQuerier
317
- if d .pool != nil {
318
- rq = d .pool
319
- } else {
320
- c , err := pgx .ConnectConfig (ctx , d .connConf )
321
- if err != nil {
322
- return - 1 , err
323
- }
324
- defer c .Close (ctx )
325
- rq = c
326
- }
327
-
328
- row := rq .QueryRow (ctx , sql , key .String ())
227
+ row := d .pool .QueryRow (ctx , sql , key .String ())
329
228
var size int
330
229
switch err := row .Scan (& size ); err {
331
230
case pgx .ErrNoRows :
0 commit comments