Skip to content

Commit ce1549c

Browse files
authored
Merge branch 'master' into master
2 parents e86d965 + f2ee5c0 commit ce1549c

25 files changed

+1128
-1464
lines changed

dq/consumer.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
const (
1414
expiration = 3600 // seconds
15-
guardValue = "1"
1615
tolerance = time.Minute * 30
1716
)
1817

@@ -45,17 +44,19 @@ func NewConsumer(c DqConf) Consumer {
4544
func (c *consumerCluster) Consume(consume Consume) {
4645
guardedConsume := func(body []byte) {
4746
key := hash.Md5Hex(body)
48-
body, ok := c.unwrap(body)
47+
taskBody, ok := c.unwrap(body)
4948
if !ok {
5049
logx.Errorf("discarded: %q", string(body))
5150
return
5251
}
5352

54-
ok, err := c.red.SetnxEx(key, guardValue, expiration)
53+
redisLock := redis.NewRedisLock(c.red, key)
54+
redisLock.SetExpire(expiration)
55+
ok, err := redisLock.Acquire()
5556
if err != nil {
5657
logx.Error(err)
5758
} else if ok {
58-
consume(body)
59+
consume(taskBody)
5960
}
6061
}
6162

dq/consumernode.go

+19-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dq
22

33
import (
4+
"errors"
45
"time"
56

67
"github.com/beanstalkd/go-beanstalk"
@@ -59,14 +60,25 @@ func (c *consumerNode) consumeEvents(consume Consume) {
5960
}
6061

6162
// the error can only be beanstalk.NameError or beanstalk.ConnError
62-
switch cerr := err.(type) {
63-
case beanstalk.ConnError:
64-
switch cerr.Err {
65-
case beanstalk.ErrTimeout:
63+
var cerr beanstalk.ConnError
64+
switch {
65+
case errors.As(err, &cerr):
66+
switch {
67+
case errors.Is(cerr.Err, beanstalk.ErrTimeout):
6668
// timeout error on timeout, just continue the loop
67-
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
68-
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
69-
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
69+
case
70+
errors.Is(cerr.Err, beanstalk.ErrBadChar),
71+
errors.Is(cerr.Err, beanstalk.ErrBadFormat),
72+
errors.Is(cerr.Err, beanstalk.ErrBuried),
73+
errors.Is(cerr.Err, beanstalk.ErrDeadline),
74+
errors.Is(cerr.Err, beanstalk.ErrDraining),
75+
errors.Is(cerr.Err, beanstalk.ErrEmpty),
76+
errors.Is(cerr.Err, beanstalk.ErrInternal),
77+
errors.Is(cerr.Err, beanstalk.ErrJobTooBig),
78+
errors.Is(cerr.Err, beanstalk.ErrNoCRLF),
79+
errors.Is(cerr.Err, beanstalk.ErrNotFound),
80+
errors.Is(cerr.Err, beanstalk.ErrNotIgnored),
81+
errors.Is(cerr.Err, beanstalk.ErrTooLong):
7082
// won't reset
7183
logx.Error(err)
7284
default:

dq/producer.go

+24-20
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package dq
22

33
import (
4-
"bytes"
54
"log"
65
"math/rand"
7-
"strconv"
86
"strings"
97
"time"
108

@@ -25,15 +23,21 @@ type (
2523
Close() error
2624
Delay(body []byte, delay time.Duration) (string, error)
2725
Revoke(ids string) error
26+
27+
at(body []byte, at time.Time) (string, error)
28+
delay(body []byte, delay time.Duration) (string, error)
2829
}
2930

3031
producerCluster struct {
3132
nodes []Producer
3233
}
3334
)
3435

36+
var rng *rand.Rand
37+
3538
func init() {
36-
rand.Seed(time.Now().UnixNano())
39+
source := rand.NewSource(time.Now().UnixNano())
40+
rng = rand.New(source)
3741
}
3842

3943
func NewProducer(beanstalks []Beanstalk) Producer {
@@ -56,10 +60,8 @@ func NewProducer(beanstalks []Beanstalk) Producer {
5660
}
5761

5862
func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
59-
wrapped := p.wrap(body, at)
60-
return p.insert(func(node Producer) (string, error) {
61-
return node.At(wrapped, at)
62-
})
63+
wrapped := wrap(body, at)
64+
return p.at(wrapped, at)
6365
}
6466

6567
func (p *producerCluster) Close() error {
@@ -73,10 +75,8 @@ func (p *producerCluster) Close() error {
7375
}
7476

7577
func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
76-
wrapped := p.wrap(body, time.Now().Add(delay))
77-
return p.insert(func(node Producer) (string, error) {
78-
return node.Delay(wrapped, delay)
79-
})
78+
wrapped := wrap(body, time.Now().Add(delay))
79+
return p.delay(wrapped, delay)
8080
}
8181

8282
func (p *producerCluster) Revoke(ids string) error {
@@ -98,17 +98,29 @@ func (p *producerCluster) Revoke(ids string) error {
9898
return be.Err()
9999
}
100100

101+
func (p *producerCluster) at(body []byte, at time.Time) (string, error) {
102+
return p.insert(func(node Producer) (string, error) {
103+
return node.at(body, at)
104+
})
105+
}
106+
101107
func (p *producerCluster) cloneNodes() []Producer {
102108
return append([]Producer(nil), p.nodes...)
103109
}
104110

111+
func (p *producerCluster) delay(body []byte, delay time.Duration) (string, error) {
112+
return p.insert(func(node Producer) (string, error) {
113+
return node.delay(body, delay)
114+
})
115+
}
116+
105117
func (p *producerCluster) getWriteNodes() []Producer {
106118
if len(p.nodes) <= replicaNodes {
107119
return p.nodes
108120
}
109121

110122
nodes := p.cloneNodes()
111-
rand.Shuffle(len(nodes), func(i, j int) {
123+
rng.Shuffle(len(nodes), func(i, j int) {
112124
nodes[i], nodes[j] = nodes[j], nodes[i]
113125
})
114126
return nodes[:replicaNodes]
@@ -156,11 +168,3 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string
156168

157169
return "", be.Err()
158170
}
159-
160-
func (p *producerCluster) wrap(body []byte, at time.Time) []byte {
161-
var builder bytes.Buffer
162-
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
163-
builder.WriteByte(timeSep)
164-
builder.Write(body)
165-
return builder.Bytes()
166-
}

dq/producernode.go

+55-33
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/beanstalkd/go-beanstalk"
11+
"github.com/zeromicro/go-zero/core/logx"
1112
)
1213

1314
var ErrTimeBeforeNow = errors.New("can't schedule task to past time")
@@ -27,46 +28,15 @@ func NewProducerNode(endpoint, tube string) Producer {
2728
}
2829

2930
func (p *producerNode) At(body []byte, at time.Time) (string, error) {
30-
now := time.Now()
31-
if at.Before(now) {
32-
return "", ErrTimeBeforeNow
33-
}
34-
35-
duration := at.Sub(now)
36-
return p.Delay(body, duration)
31+
return p.at(wrap(body, at), at)
3732
}
3833

3934
func (p *producerNode) Close() error {
4035
return p.conn.Close()
4136
}
4237

4338
func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
44-
conn, err := p.conn.get()
45-
if err != nil {
46-
return "", err
47-
}
48-
49-
id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
50-
if err == nil {
51-
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
52-
}
53-
54-
// the error can only be beanstalk.NameError or beanstalk.ConnError
55-
// just return when the error is beanstalk.NameError, don't reset
56-
switch cerr := err.(type) {
57-
case beanstalk.ConnError:
58-
switch cerr.Err {
59-
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
60-
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
61-
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
62-
// won't reset
63-
default:
64-
// beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
65-
p.conn.reset()
66-
}
67-
}
68-
69-
return "", err
39+
return p.delay(wrap(body, time.Now().Add(delay)), delay)
7040
}
7141

7242
func (p *producerNode) Revoke(jointId string) error {
@@ -96,3 +66,55 @@ func (p *producerNode) Revoke(jointId string) error {
9666
// if not in this beanstalk, ignore
9767
return nil
9868
}
69+
70+
func (p *producerNode) at(body []byte, at time.Time) (string, error) {
71+
now := time.Now()
72+
if at.Before(now) {
73+
return "", ErrTimeBeforeNow
74+
}
75+
76+
duration := at.Sub(now)
77+
return p.delay(body, duration)
78+
}
79+
80+
func (p *producerNode) delay(body []byte, delay time.Duration) (string, error) {
81+
conn, err := p.conn.get()
82+
if err != nil {
83+
return "", err
84+
}
85+
86+
id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
87+
if err == nil {
88+
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
89+
}
90+
91+
// the error can only be beanstalk.NameError or beanstalk.ConnError
92+
// just return when the error is beanstalk.NameError, don't reset
93+
var cerr beanstalk.ConnError
94+
switch {
95+
case errors.As(err, &cerr):
96+
switch {
97+
case
98+
errors.Is(cerr.Err, beanstalk.ErrBadChar),
99+
errors.Is(cerr.Err, beanstalk.ErrBadFormat),
100+
errors.Is(cerr.Err, beanstalk.ErrBuried),
101+
errors.Is(cerr.Err, beanstalk.ErrDeadline),
102+
errors.Is(cerr.Err, beanstalk.ErrDraining),
103+
errors.Is(cerr.Err, beanstalk.ErrEmpty),
104+
errors.Is(cerr.Err, beanstalk.ErrInternal),
105+
errors.Is(cerr.Err, beanstalk.ErrJobTooBig),
106+
errors.Is(cerr.Err, beanstalk.ErrNoCRLF),
107+
errors.Is(cerr.Err, beanstalk.ErrNotFound),
108+
errors.Is(cerr.Err, beanstalk.ErrNotIgnored),
109+
errors.Is(cerr.Err, beanstalk.ErrTooLong):
110+
// won't reset
111+
default:
112+
// beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
113+
p.conn.reset()
114+
}
115+
default:
116+
logx.Error(err)
117+
}
118+
119+
return "", err
120+
}

dq/wrapper.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package dq
2+
3+
import (
4+
"bytes"
5+
"strconv"
6+
"time"
7+
)
8+
9+
func wrap(body []byte, at time.Time) []byte {
10+
var builder bytes.Buffer
11+
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
12+
builder.WriteByte(timeSep)
13+
builder.Write(body)
14+
return builder.Bytes()
15+
}
File renamed without changes.

example/dq/producer/node/producer.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"time"
7+
8+
"github.com/zeromicro/go-queue/dq"
9+
)
10+
11+
func main() {
12+
producer := dq.NewProducerNode("localhost:11300", "tube")
13+
14+
for i := 1000; i < 1005; i++ {
15+
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
16+
if err != nil {
17+
fmt.Println(err)
18+
}
19+
}
20+
}

example/kq/consumer/queue.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/zeromicro/go-queue/kq"
@@ -11,7 +12,7 @@ func main() {
1112
var c kq.KqConf
1213
conf.MustLoad("config.yaml", &c)
1314

14-
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
15+
q := kq.MustNewQueue(c, kq.WithHandle(func(ctx context.Context, k, v string) error {
1516
fmt.Printf("=> %s\n", v)
1617
return nil
1718
}))

example/kq/producer/produce.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"log"
@@ -40,8 +41,11 @@ func main() {
4041
log.Fatal(err)
4142
}
4243

43-
fmt.Println(string(body))
44-
if err := pusher.Push(string(body)); err != nil {
44+
if err := pusher.Push(context.Background(), string(body)); err != nil {
45+
log.Fatal(err)
46+
}
47+
48+
if err := pusher.KPush(context.Background(), "test", string(body)); err != nil {
4549
log.Fatal(err)
4650
}
4751
}

0 commit comments

Comments
 (0)