Skip to content

Commit 8e6d63a

Browse files
committed
Do not commit on each update when batching, and allow for setting the max batch size
1 parent ca335a9 commit 8e6d63a

File tree

3 files changed

+39
-10
lines changed

3 files changed

+39
-10
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,16 @@ func main() {
5454
}
5555
```
5656

57+
## Running the tests
58+
59+
For running the tests you will need a Postgres database listening on localhost. The easiest way of getting one is by using Docker:
60+
61+
``` sh
62+
docker run --rm -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres:11.14-alpine &
63+
```
64+
65+
`
66+
5767
## API
5868

5969
[GoDoc Reference](https://godoc.org/github.com/alanshaw/ipfs-ds-postgres)

batching.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,45 @@ import (
99
)
1010

1111
type batch struct {
12-
ds *Datastore
13-
batch *pgx.Batch
12+
ds *Datastore
13+
batch *pgx.Batch
14+
maxBatchSize uint16
1415
}
1516

1617
// Batch creates a set of deferred updates to the database.
1718
func (d *Datastore) Batch() (ds.Batch, error) {
18-
return &batch{ds: d, batch: &pgx.Batch{}}, nil
19+
b := &batch{ds: d, batch: &pgx.Batch{}, maxBatchSize: 0}
20+
b.batch.Queue("BEGIN")
21+
return b, nil
22+
}
23+
24+
// Set the max batch size (0 or default means unlimited - the batch is only cleared when calling Commit)
25+
func (b *batch) SetMaxBatchSize(size uint16) {
26+
b.maxBatchSize = size
27+
}
28+
29+
func (b *batch) checkMaxBatchSize() error {
30+
var err error
31+
32+
if b.maxBatchSize != 0 && b.batch.Len() >= int(b.maxBatchSize) {
33+
err = b.CommitContext(context.Background())
34+
}
35+
36+
if err != nil {
37+
b.batch = &pgx.Batch{}
38+
}
39+
40+
return err
1941
}
2042

2143
func (b *batch) Put(key ds.Key, value []byte) error {
22-
b.batch.Queue("BEGIN")
2344
sql := fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", b.ds.table)
2445
b.batch.Queue(sql, key.String(), value)
25-
b.batch.Queue("COMMIT")
26-
return nil
46+
return b.checkMaxBatchSize()
2747
}
2848

2949
func (b *batch) Delete(key ds.Key) error {
30-
b.batch.Queue("BEGIN")
3150
b.batch.Queue(fmt.Sprintf("DELETE FROM %s WHERE key = $1", b.ds.table), key.String())
32-
b.batch.Queue("COMMIT")
3351
return nil
3452
}
3553

@@ -38,6 +56,7 @@ func (b *batch) Commit() error {
3856
}
3957

4058
func (b *batch) CommitContext(ctx context.Context) error {
59+
b.batch.Queue("COMMIT")
4160
res := b.ds.pool.SendBatch(ctx, b.batch)
4261
defer res.Close()
4362

datastore_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func initPG(t *testing.T) {
2727
connConf, err := pgx.ParseConfig(fmt.Sprintf(
2828
"postgres://%s:%s@%s/%s?sslmode=disable",
2929
envString(t, "PG_USER", "postgres"),
30-
envString(t, "PG_PASS", ""),
30+
envString(t, "PG_PASS", "postgres"),
3131
envString(t, "PG_HOST", "127.0.0.1"),
3232
envString(t, "PG_DB", envString(t, "PG_USER", "postgres")),
3333
))
@@ -62,7 +62,7 @@ func newDS(t *testing.T) (*Datastore, func()) {
6262
connString := fmt.Sprintf(
6363
"postgres://%s:%s@%s/%s?sslmode=disable",
6464
envString(t, "PG_USER", "postgres"),
65-
envString(t, "PG_PASS", ""),
65+
envString(t, "PG_PASS", "postgres"),
6666
envString(t, "PG_HOST", "127.0.0.1"),
6767
"test_datastore",
6868
)

0 commit comments

Comments
 (0)