-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathsegment.go
More file actions
122 lines (105 loc) · 2.95 KB
/
segment.go
File metadata and controls
122 lines (105 loc) · 2.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package segmenter
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/hextechpal/segmenter/internal/segmenter/locker"
"github.com/rs/zerolog"
"time"
)
const lockDuration = 10 * time.Second
type segment struct {
c *Consumer
partition partition
lock locker.Lock
shutDown chan bool
logger *zerolog.Logger
}
func newSegment(ctx context.Context, c *Consumer, partition partition) (*segment, error) {
logger := c.logger.With().Int("partition", int(partition)).Logger()
sg := &segment{partition: partition, c: c, shutDown: make(chan bool), logger: &logger}
lock, err := c.s.locker.Acquire(ctx, sg.partitionLockKey(), lockDuration, c.id)
if err != nil {
logger.Error().Msgf("Failed to Acquire lock with key %s, %v", c.GetStreamName(), err)
return nil, err
}
sg.lock = lock
go sg.refreshLock()
return sg, nil
}
func (sg *segment) refreshLock() {
ctx := context.Background()
for {
select {
case <-sg.shutDown:
err := sg.lock.Release(ctx)
if err != nil {
sg.logger.Error().Err(err).Msgf("Releasing lock with key stream %s, partition %d", sg.c.GetStreamName(), sg.partition)
}
return
default:
err := sg.lock.Refresh(ctx, lockDuration)
if err != nil {
sg.logger.Error().Err(err).Msgf("Error happened while refreshing lock %s", sg.lock.Key())
}
}
time.Sleep(1000 * time.Millisecond)
}
}
func (sg *segment) pendingEntries(ctx context.Context, ch chan *pendingResponse) {
pending, err := sg.c.s.rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: partitionedStream(sg.c.GetNameSpace(), sg.c.GetStreamName(), sg.partition),
Group: sg.c.group,
Idle: sg.c.maxProcessingTime,
Start: "-",
End: "+",
Count: sg.c.batchSize,
}).Result()
if err != nil {
ch <- &pendingResponse{
err: err,
partition: sg.partition,
messageIds: nil,
}
return
}
//TODO handle retry count and move to dead letter queue
messageIds := make([]string, 0)
for _, xp := range pending {
messageIds = append(messageIds, xp.ID)
}
sg.logger.Debug().Msgf("Length of pending entries : %d", len(messageIds))
ch <- &pendingResponse{
err: nil,
partition: sg.partition,
messageIds: messageIds,
}
}
func (sg *segment) claimEntries(ctx context.Context, ch chan *claimResponse, ids []string) {
result, err := sg.c.s.rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: partitionedStream(sg.c.GetNameSpace(), sg.c.GetStreamName(), sg.partition),
Group: sg.c.group,
Consumer: sg.c.id,
Messages: ids,
}).Result()
if err != nil {
ch <- &claimResponse{
err: err,
partition: sg.partition,
messages: nil,
}
return
}
sg.logger.Info().Msgf("Claimed %d messages", len(result))
ch <- &claimResponse{
err: nil,
partition: sg.partition,
messages: mapXMessageToCMessage(result),
}
}
func (sg *segment) partitionLockKey() string {
return fmt.Sprintf("__%s:__%s_%s:strm_%d", sg.c.s.ns, sg.c.s.name, sg.c.group, sg.partition)
}
func (sg *segment) stop() {
sg.shutDown <- true
}