forked from Surfline/badgerutils
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwriter.go
More file actions
161 lines (134 loc) · 3.43 KB
/
writer.go
File metadata and controls
161 lines (134 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Package badgerutils provides functions for interacting with the underlying database.
package badgerutils
import (
"bufio"
"bytes"
"encoding/gob"
"errors"
"flag"
"fmt"
"github.com/dgraph-io/badger"
"io"
"log"
"os"
"strings"
"sync"
"sync/atomic"
"time"
)
type Keyed interface {
Key() string
}
type keyValue struct {
Key []byte
Value []byte
}
type count32 int32
func (c *count32) increment(a int32) int32 {
return atomic.AddInt32((*int32)(c), a)
}
func (c *count32) get() int32 {
return atomic.LoadInt32((*int32)(c))
}
func stringToKeyValue(str string, lineToKeyed func(string) (Keyed, error)) (*keyValue, error) {
record, err := lineToKeyed(str)
if err != nil {
return nil, err
}
buf := &bytes.Buffer{}
if err = gob.NewEncoder(buf).Encode(record); err != nil {
return nil, err
}
return &keyValue{
Key: []byte(record.Key()),
Value: buf.Bytes(),
}, nil
}
func writeBatch(kvs []keyValue, db *badger.DB, cherr chan error, done func(int32)) {
txn := db.NewTransaction(true)
defer txn.Discard()
for _, kv := range kvs {
if err := txn.Set(kv.Key, kv.Value); err != nil {
cherr <- err
}
}
txn.Commit(func(err error) {
if err != nil {
cherr <- err
}
done(int32(len(kvs)))
})
}
func writeInput(reader io.Reader, dir string, batchSize int, lineToKeyed func(string) (Keyed, error)) error {
log.Printf("Directory: %v", dir)
log.Printf("Batch Size: %v", batchSize)
// Open Badger database from directory
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
db, err := badger.Open(opts)
if err != nil {
return err
}
defer db.Close()
start := time.Now()
// Wait group ensures all transactions are committed before reading errors from channel
var wg sync.WaitGroup
var kvCount count32
done := func(processedCount int32) {
kvCount.increment(processedCount)
log.Printf("Records: %v\n", int32(kvCount))
wg.Done()
}
kvBatch := make([]keyValue, 0)
cherr := make(chan error)
// Read from stdin and write key/values in batches
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
kv, err := stringToKeyValue(scanner.Text(), lineToKeyed)
if err != nil {
return err
}
kvBatch = append(kvBatch, *kv)
if len(kvBatch) == batchSize {
wg.Add(1)
go writeBatch(kvBatch, db, cherr, done)
kvBatch = make([]keyValue, 0)
}
}
// Write remaining key/values
if len(kvBatch) > 0 {
wg.Add(1)
writeBatch(kvBatch, db, cherr, done)
}
// Read and handle errors streaming from stdin
if err = scanner.Err(); err != nil {
return err
}
wg.Wait()
close(cherr)
// Read and handle transaction errors
errs := make([]string, 0)
for err := range cherr {
errs = append(errs, fmt.Sprintf("%v", err))
}
if len(errs) > 0 {
return fmt.Errorf("Errors inserting records:\n%v", strings.Join(errs, "\n"))
}
end := time.Now()
elapsed := end.Sub(start)
log.Printf("Inserted %v records in %v", kvCount.get(), elapsed)
return nil
}
// WriteStdin translates stdin into key/value pairs that are written into the Badger.
// lineToKeyed function parameter defines how stdin is translated to a value and how to define a key
// from that value.
func WriteStdin(lineToKeyed func(string) (Keyed, error)) error {
dir := flag.String("dir", "", "Directory to save DB files")
batchSize := flag.Int("batch-size", 1000, "Number of records to write per transaction")
flag.Parse()
if *dir == "" {
return errors.New("dir flag is required")
}
return writeInput(os.Stdin, *dir, *batchSize, lineToKeyed)
}