Skip to content

Commit b6b8cd3

Browse files
fix panic: unlock of unlocked mutex
1 parent 36771d0 commit b6b8cd3

File tree

3 files changed

+41
-39
lines changed

3 files changed

+41
-39
lines changed

lib/adaptivequemgr.go

+8-11
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/rand"
24-
"sync/atomic"
2524
"strings"
25+
"sync/atomic"
2626
"time"
2727

2828
"github.com/paypal/hera/cal"
@@ -138,11 +138,12 @@ func bindEvictNameOk(bindName string) (bool) {
138138
bind name and values */
139139
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
140140
throttleCount := 0
141-
GetBindEvict().lock.Lock()
142-
for _,keyValues := range GetBindEvict().BindThrottle {
141+
bindEvict := GetBindEvict()
142+
bindEvict.lock.Lock()
143+
defer bindEvict.lock.Unlock()
144+
for _,keyValues := range bindEvict.BindThrottle {
143145
throttleCount += len(keyValues)
144146
}
145-
GetBindEvict().lock.Unlock()
146147
if throttleCount > GetConfig().BindEvictionMaxThrottle {
147148
if logger.GetLogger().V(logger.Info) {
148149
logger.GetLogger().Log(logger.Info, "already too many bind throttles, skipping bind eviction and throttle")
@@ -159,9 +160,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
159160
}
160161
usqlhash := uint32(worker.sqlHash)
161162
sqlhash := atomic.LoadUint32(&(usqlhash))
162-
GetBindEvict().lock.Lock()
163-
_, ok := GetBindEvict().BindThrottle[sqlhash]
164-
GetBindEvict().lock.Unlock()
163+
_, ok := bindEvict.BindThrottle[sqlhash]
165164
if ok {
166165
continue // don't repeatedly bind evict something already evicted
167166
}
@@ -261,13 +260,11 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
261260
}
262261

263262
// setup allow-every-x
264-
GetBindEvict().lock.Lock()
265-
sqlBind, ok := GetBindEvict().BindThrottle[sqlhash]
263+
sqlBind, ok := bindEvict.BindThrottle[sqlhash]
266264
if !ok {
267265
sqlBind = make(map[string]*BindThrottle)
268-
GetBindEvict().BindThrottle[sqlhash] = sqlBind
266+
bindEvict.BindThrottle[sqlhash] = sqlBind
269267
}
270-
GetBindEvict().lock.Unlock()
271268
concatKey := fmt.Sprintf("%s|%s", bindName, bindValue)
272269
throttle, ok := sqlBind[concatKey]
273270
if ok {

lib/bindevict.go

+28-24
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ func GetBindEvict() *BindEvict {
5454
}
5555
return cfg.(*BindEvict)
5656
}
57-
func (this *BindEvict) Copy() *BindEvict {
57+
func (bindEvict *BindEvict) Copy() *BindEvict {
5858
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
59-
for k,v := range this.BindThrottle {
59+
for k,v := range bindEvict.BindThrottle {
6060
out.BindThrottle[k] = v
6161
}
6262
return &out
@@ -85,41 +85,44 @@ func (entry *BindThrottle) decrAllowEveryX(y int) {
8585
return
8686
}
8787
entry.AllowEveryX = 0
88-
GetBindEvict().lock.Lock()
89-
defer GetBindEvict().lock.Unlock()
88+
}
89+
func (entry *BindThrottle) incrAllowEveryX() {
90+
if logger.GetLogger().V(logger.Warning) {
91+
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
92+
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
93+
}
94+
entry.AllowEveryX = 3*entry.AllowEveryX + 1
95+
if entry.AllowEveryX > 10000 {
96+
entry.AllowEveryX = 10000
97+
}
98+
}
99+
100+
func (bindEvict *BindEvict) updateThrottle(entry *BindThrottle) {
90101
// delete entry
91-
if len(GetBindEvict().BindThrottle[entry.Sqlhash]) == 1 {
92-
updateCopy := GetBindEvict().Copy()
102+
if len(bindEvict.BindThrottle[entry.Sqlhash]) == 1 {
103+
updateCopy := bindEvict.Copy()
93104
delete(updateCopy.BindThrottle, entry.Sqlhash)
94105
gBindEvict.Store(updateCopy)
95106
} else {
96107
// copy everything except bindKV (skipping it is deleting it)
97108
bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value)
98-
updateCopy := make(map[string]*BindThrottle)
99-
for k,v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
109+
updateCopy := bindEvict.Copy()
110+
updateBindThrottleCopy := make(map[string]*BindThrottle)
111+
for k,v := range bindEvict.BindThrottle[entry.Sqlhash] {
100112
if k == bindKV {
101113
continue
102114
}
103-
updateCopy[k] = v
115+
updateBindThrottleCopy[k] = v
104116
}
105-
GetBindEvict().BindThrottle[entry.Sqlhash] = updateCopy
106-
}
107-
}
108-
func (entry *BindThrottle) incrAllowEveryX() {
109-
if logger.GetLogger().V(logger.Warning) {
110-
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
111-
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
112-
}
113-
entry.AllowEveryX = 3*entry.AllowEveryX + 1
114-
if entry.AllowEveryX > 10000 {
115-
entry.AllowEveryX = 10000
117+
updateCopy.BindThrottle[entry.Sqlhash] = updateBindThrottleCopy
118+
gBindEvict.Store(updateCopy)
116119
}
117120
}
118121

119-
func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
120-
GetBindEvict().lock.Lock()
121-
sqlBinds := GetBindEvict().BindThrottle[sqlhash]
122-
GetBindEvict().lock.Unlock()
122+
func (bindEvict *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
123+
bindEvict.lock.Lock()
124+
defer bindEvict.lock.Unlock()
125+
sqlBinds := bindEvict.BindThrottle[sqlhash]
123126
for k0, v := range bindKV /*parseBinds(request)*/ {
124127
k := NormalizeBindName(k0)
125128
concatKey := fmt.Sprintf("%s|%s", k, v)
@@ -143,6 +146,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy
143146
gap := now.Sub(*recent).Seconds() * GetConfig().BindEvictionDecrPerSec
144147
entry.decrAllowEveryX(int(gap))
145148
if entry.AllowEveryX == 0 {
149+
bindEvict.updateThrottle(entry)
146150
return false, nil
147151
}
148152

lib/coordinator.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -652,9 +652,10 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
652652
xShardRead := false
653653

654654
// check bind throttle
655-
GetBindEvict().lock.Lock()
656-
_, ok := GetBindEvict().BindThrottle[uint32(crd.sqlhash)]
657-
GetBindEvict().lock.Unlock()
655+
bindEvict := GetBindEvict()
656+
bindEvict.lock.Lock()
657+
_, ok := bindEvict.BindThrottle[uint32(crd.sqlhash)]
658+
bindEvict.lock.Unlock()
658659
if ok {
659660
wType := wtypeRW
660661
cfg := GetNumWorkers(crd.shard.shardID)
@@ -686,7 +687,7 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
686687
logger.GetLogger().Log(logger.Debug, msg)
687688
}
688689
}
689-
needBlock, throttleEntry := GetBindEvict().ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage)
690+
needBlock, throttleEntry := bindEvict.ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage)
690691
if needBlock {
691692
msg := fmt.Sprintf("k=%s&v=%s&allowEveryX=%d&allowFrac=%.5f&raddr=%s",
692693
throttleEntry.Name,

0 commit comments

Comments
 (0)