Skip to content

Commit d429399

Browse files
authored
Merge pull request #83 from ahmczsy/master
add insert limiter
2 parents 7ac386d + 6e321e9 commit d429399

File tree

6 files changed

+68
-17
lines changed

6 files changed

+68
-17
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ require (
3636
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
3737
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
3838
golang.org/x/text v0.3.4 // indirect
39+
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
3940
golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b // indirect
4041
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
4142
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22

pkg/ckgroup/exec.go

+9-12
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ckgroup
22

33
import (
44
"errors"
5+
"math/rand"
56
"sort"
67
"strings"
78
"sync"
@@ -154,25 +155,21 @@ func (g *dbGroup) ExecAll(query string, args [][]interface{}) error {
154155
}
155156

156157
func (g *dbGroup) exec(idx int, query string, rows []rowValue) error {
158+
shardConns := g.GetAllShard()[idx].GetAllConn()
159+
execOrder := rand.Perm(len(shardConns))
157160
var err error
158161
for attempt := 1; attempt <= g.opt.RetryNum; attempt++ {
159-
err = saveData(g.ShardNodes[idx].GetShardConn().GetRawConn(), query, rows)
160-
if err != nil {
161-
logx.Infof("[attempt %d/%d] Node[%d] primary node execute error:%v, will switch to replica node", attempt, g.opt.RetryNum, idx, err)
162-
} else {
163-
return nil
164-
}
165-
for i, replicaNode := range g.ShardNodes[idx].GetReplicaConn() {
166-
err = saveData(replicaNode.GetRawConn(), query, rows)
167-
if err != nil {
168-
logx.Infof("[attempt %d/%d] Node[%d] replica[%d] execute error:%v, will switch to next replica node", attempt, g.opt.RetryNum, idx, i, err)
169-
} else {
162+
for _, order := range execOrder {
163+
err = saveData(shardConns[order].GetRawConn(), query, rows)
164+
if err == nil {
170165
return nil
171166
}
167+
logx.Errorf("[attempt %d/%d] shard[%d] node[%d] insert error:%s, will switch to next node", attempt, g.opt.RetryNum, idx+1, order+1, err.Error())
168+
continue
172169
}
173170
}
174171
if err != nil {
175-
logx.Errorf("All node exec failed. Retry num:%d. Last fail reason: %v, query: %s", g.opt.RetryNum, err, query)
172+
logx.Errorf("shard[%d] all node exec failed. Retry num:%d. Last fail reason: %v, query: %s", idx+1, g.opt.RetryNum, err, query)
176173
}
177174
return err
178175
}

pkg/ckgroup/insert.go

+12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ckgroup
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"reflect"
@@ -20,6 +21,11 @@ func (g *dbGroup) InsertAuto(query string, hashTag string, sliceData interface{}
2021
return err
2122
}
2223

24+
err = g.opt.GroupInsertLimiter.Wait(context.Background())
25+
if err != nil {
26+
logx.Error(err)
27+
}
28+
2329
var eg errgroup.Group
2430
for i, shardConn := range g.ShardNodes {
2531

@@ -54,6 +60,12 @@ func (g *dbGroup) InsertAutoDetail(query string, hashTag string, sliceData inter
5460
if err != nil {
5561
return nil, err
5662
}
63+
64+
err = g.opt.GroupInsertLimiter.Wait(context.Background())
65+
if err != nil {
66+
logx.Error(err)
67+
}
68+
5769
waitGroup := sync.WaitGroup{}
5870
waitGroup.Add(len(g.ShardNodes))
5971
ch := make(chan InsertErrDetail, len(g.GetAllShard()))

pkg/ckgroup/insert_err_detail_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import (
66
"testing"
77

88
"github.com/tal-tech/cds/pkg/ckgroup/dbtesttool/dbtool"
9+
"golang.org/x/time/rate"
910
)
1011

1112
func Test_dbGroup_InsertAutoDetail(t *testing.T) {
1213
dataSet := dbtool.GenerateDataSet(10000)
1314

1415
c1 := dbGroup{
1516
ShardNodes: []ShardConn{&fakeShardConn{true}, &fakeShardConn{true}, &fakeShardConn{true}},
16-
opt: option{RetryNum: 3},
17+
opt: option{RetryNum: 3, GroupInsertLimiter: rate.NewLimiter(rate.Inf, 0)},
1718
}
1819
errDetail1, err := c1.InsertAutoDetail(``, "pk", dataSet)
1920
if err != nil {
@@ -29,7 +30,7 @@ func Test_dbGroup_InsertAutoDetail(t *testing.T) {
2930
}
3031
c2 := dbGroup{
3132
ShardNodes: []ShardConn{&fakeShardConn{false}, &fakeShardConn{false}, &fakeShardConn{false}},
32-
opt: option{RetryNum: 3},
33+
opt: option{RetryNum: 3, GroupInsertLimiter: rate.NewLimiter(rate.Inf, 0)},
3334
}
3435
errDetail2, err := c2.InsertAutoDetail(``, "pk", dataSet)
3536
if err != nil {
@@ -41,7 +42,7 @@ func Test_dbGroup_InsertAutoDetail(t *testing.T) {
4142

4243
c3 := dbGroup{
4344
ShardNodes: []ShardConn{&fakeShardConn{false}, &fakeShardConn{true}, &fakeShardConn{false}},
44-
opt: option{RetryNum: 3},
45+
opt: option{RetryNum: 3, GroupInsertLimiter: rate.NewLimiter(rate.Inf, 0)},
4546
}
4647
errDetail3, err := c3.InsertAutoDetail(``, "pk", dataSet)
4748
if err != nil {

pkg/ckgroup/option.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
package ckgroup
22

3+
import (
4+
"golang.org/x/time/rate"
5+
)
6+
37
type option struct {
4-
RetryNum int
8+
RetryNum int
9+
GroupInsertLimiter *rate.Limiter
510
}
611
type OptionFunc func(*option)
712

813
func newOptions(opts ...OptionFunc) option {
914
opt := option{
10-
RetryNum: 1,
15+
RetryNum: 1,
16+
GroupInsertLimiter: rate.NewLimiter(rate.Inf, 0),
1117
}
1218

1319
for _, o := range opts {
@@ -22,3 +28,9 @@ func WithRetryNum(retryNum int) OptionFunc {
2228
o.RetryNum = retryNum
2329
}
2430
}
31+
32+
func WithGroupInsertLimiter(limit rate.Limit, burst int) OptionFunc {
33+
return func(o *option) {
34+
o.GroupInsertLimiter = rate.NewLimiter(limit, burst)
35+
}
36+
}

pkg/ckgroup/option_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package ckgroup
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/tal-tech/cds/pkg/ckgroup/dbtesttool/dbtool"
8+
"golang.org/x/time/rate"
9+
)
10+
11+
func TestWithGroupInsertLimiter(t *testing.T) {
12+
db := dbGroup{
13+
ShardNodes: []ShardConn{&fakeShardConn{true}},
14+
opt: option{GroupInsertLimiter: rate.NewLimiter(rate.Every(time.Millisecond*500), 1)},
15+
}
16+
dataSet := dbtool.GenerateDataSet(10000)
17+
start := time.Now()
18+
for i := 1; i <= 3; i++ {
19+
err := db.InsertAuto(``, `pk`, dataSet)
20+
if err != nil {
21+
t.Fatal(err)
22+
}
23+
}
24+
25+
if time.Since(start) <= time.Millisecond*900 {
26+
t.Fatal(`insert limiter did not meet expectations`)
27+
}
28+
}

0 commit comments

Comments
 (0)