Skip to content

Commit 004b867

Browse files
authored
Merge pull request #63 from pkafma-aon/master
fix: Duplicated messages issue in dq.NewConsumer's Consume method
2 parents 7016060 + 7063d28 commit 004b867

File tree

3 files changed

+43
-14
lines changed

3 files changed

+43
-14
lines changed

dq/producer.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ const (
1919

2020
type (
2121
Producer interface {
22+
atWithWrapper(body []byte, at time.Time) (string, error)
2223
At(body []byte, at time.Time) (string, error)
2324
Close() error
25+
delayWithWrapper(body []byte, delay time.Duration) (string, error)
2426
Delay(body []byte, delay time.Duration) (string, error)
2527
Revoke(ids string) error
2628
}
@@ -54,8 +56,9 @@ func NewProducer(beanstalks []Beanstalk) Producer {
5456
}
5557

5658
func (p *producerCluster) At(body []byte, at time.Time) (string, error) {
59+
wrapped := wrap(body, at)
5760
return p.insert(func(node Producer) (string, error) {
58-
return node.At(body, at)
61+
return node.atWithWrapper(wrapped, at)
5962
})
6063
}
6164

@@ -70,8 +73,9 @@ func (p *producerCluster) Close() error {
7073
}
7174

7275
func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) {
76+
wrapped := wrap(body, time.Now().Add(delay))
7377
return p.insert(func(node Producer) (string, error) {
74-
return node.Delay(body, delay)
78+
return node.delayWithWrapper(wrapped, delay)
7579
})
7680
}
7781

@@ -152,3 +156,15 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string
152156

153157
return "", be.Err()
154158
}
159+
160+
func (p *producerCluster) atWithWrapper(body []byte, at time.Time) (string, error) {
161+
return p.insert(func(node Producer) (string, error) {
162+
return node.atWithWrapper(body, at)
163+
})
164+
}
165+
166+
func (p *producerCluster) delayWithWrapper(body []byte, delay time.Duration) (string, error) {
167+
return p.insert(func(node Producer) (string, error) {
168+
return node.delayWithWrapper(body, delay)
169+
})
170+
}

dq/producernode.go

+10-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package dq
22

33
import (
4-
"bytes"
54
"errors"
65
"fmt"
76
"strconv"
@@ -29,27 +28,34 @@ func NewProducerNode(endpoint, tube string) Producer {
2928
}
3029

3130
func (p *producerNode) At(body []byte, at time.Time) (string, error) {
31+
return p.atWithWrapper(wrap(body, at), at)
32+
}
33+
34+
func (p *producerNode) atWithWrapper(body []byte, at time.Time) (string, error) {
3235
now := time.Now()
3336
if at.Before(now) {
3437
return "", ErrTimeBeforeNow
3538
}
3639

3740
duration := at.Sub(now)
38-
return p.Delay(body, duration)
41+
return p.delayWithWrapper(body, duration)
3942
}
4043

4144
func (p *producerNode) Close() error {
4245
return p.conn.Close()
4346
}
4447

4548
func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
49+
return p.delayWithWrapper(wrap(body, time.Now().Add(delay)), delay)
50+
}
51+
52+
func (p *producerNode) delayWithWrapper(body []byte, delay time.Duration) (string, error) {
4653
conn, err := p.conn.get()
4754
if err != nil {
4855
return "", err
4956
}
5057

51-
wrapped := p.wrap(body, time.Now().Add(delay))
52-
id, err := conn.Put(wrapped, PriNormal, delay, defaultTimeToRun)
58+
id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
5359
if err == nil {
5460
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
5561
}
@@ -112,11 +118,3 @@ func (p *producerNode) Revoke(jointId string) error {
112118
// if not in this beanstalk, ignore
113119
return nil
114120
}
115-
116-
func (p *producerNode) wrap(body []byte, at time.Time) []byte {
117-
var builder bytes.Buffer
118-
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
119-
builder.WriteByte(timeSep)
120-
builder.Write(body)
121-
return builder.Bytes()
122-
}

dq/wrapper.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package dq
2+
3+
import (
4+
"bytes"
5+
"strconv"
6+
"time"
7+
)
8+
9+
func wrap(body []byte, at time.Time) []byte {
10+
var builder bytes.Buffer
11+
builder.WriteString(strconv.FormatInt(at.UnixNano(), 10))
12+
builder.WriteByte(timeSep)
13+
builder.Write(body)
14+
return builder.Bytes()
15+
}

0 commit comments

Comments
 (0)