Skip to content

Commit 9080ba4

Browse files
authored
Merge pull request #25 from changsongl/develop
Fix fetch interval bug and add pprof
2 parents ed0b1d2 + 6301488 commit 9080ba4

File tree

9 files changed

+153
-9
lines changed

9 files changed

+153
-9
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ go.sum
1818

1919
# IDE and OS
2020
.idea
21+
2122
.DS_Store

cmd/delayqueue/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,10 @@ func run() int {
133133

134134
p := pool.New(s, l)
135135
q := queue.New(s, conf.DelayQueue.QueueName)
136-
t := timer.New(l, time.Duration(conf.DelayQueue.TimerFetchInterval)*time.Millisecond)
136+
t := timer.New(
137+
l, time.Duration(conf.DelayQueue.TimerFetchInterval)*time.Millisecond,
138+
time.Duration(conf.DelayQueue.TimerFetchDelay)*time.Millisecond,
139+
)
137140
return b, p, q, t
138141
},
139142
)

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type DelayQueue struct {
5050
BucketMaxFetchNum uint64 `yaml:"bucket_max_fetch_num,omitempty" json:"bucket_max_fetch_num,omitempty"`
5151
QueueName string `yaml:"queue_name,omitempty" json:"queue_name,omitempty"`
5252
TimerFetchInterval int `yaml:"timer_fetch_interval,omitempty" json:"timer_fetch_interval,omitempty"`
53+
TimerFetchDelay int `yaml:"timer_fetch_delay,omitempty" json:"timer_fetch_delay,omitempty"`
5354
}
5455

5556
// Redis redis configuration

config/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ delay_queue:
77
queue_name: "dq_queue" # queue redis key name
88
bucket_max_fetch_num: 200 # max fetch number of jobs in the bucket
99
timer_fetch_interval: 1000 # fetching job interval(ms), decrease interval may get better throughout.
10+
timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch,
11+
# it will delay timer_fetch_delay ms for next fetch. Default is not wait.
1012

1113
# redis config
1214
redis:

config/config.yaml.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ delay_queue:
77
queue_name: "dqqueue" # queue redis key name
88
bucket_max_fetch_num: 250 # max fetch number of jobs in the bucket
99
timer_fetch_interval: 2000 # fetching job interval(ms), decrease interval may get better throughout.
10+
timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch,
11+
# it will delay timer_fetch_delay ms for next fetch. Default is not wait.
1012

1113
# redis config
1214
redis:

server/pprof.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package server
2+
3+
import (
4+
"net/http/pprof"
5+
"strings"
6+
7+
"github.com/gin-gonic/gin"
8+
)
9+
10+
// WrapPProf Wrap adds several routes from package `net/http/pprof` to *gin.Engine object
11+
func WrapPProf(router *gin.Engine) {
12+
WrapGroup(&router.RouterGroup)
13+
}
14+
15+
// WrapGroup adds several routes from package `net/http/pprof` to *gin.RouterGroup object
16+
func WrapGroup(router *gin.RouterGroup) {
17+
routers := []struct {
18+
Method string
19+
Path string
20+
Handler gin.HandlerFunc
21+
}{
22+
{"GET", "/debug/pprof/", IndexHandler()},
23+
{"GET", "/debug/pprof/heap", HeapHandler()},
24+
{"GET", "/debug/pprof/goroutine", GoroutineHandler()},
25+
{"GET", "/debug/pprof/allocs", AllocsHandler()},
26+
{"GET", "/debug/pprof/block", BlockHandler()},
27+
{"GET", "/debug/pprof/threadcreate", ThreadCreateHandler()},
28+
{"GET", "/debug/pprof/cmdline", CmdlineHandler()},
29+
{"GET", "/debug/pprof/profile", ProfileHandler()},
30+
{"GET", "/debug/pprof/symbol", SymbolHandler()},
31+
{"POST", "/debug/pprof/symbol", SymbolHandler()},
32+
{"GET", "/debug/pprof/trace", TraceHandler()},
33+
{"GET", "/debug/pprof/mutex", MutexHandler()},
34+
}
35+
36+
basePath := strings.TrimSuffix(router.BasePath(), "/")
37+
var prefix string
38+
39+
switch {
40+
case basePath == "":
41+
prefix = ""
42+
case strings.HasSuffix(basePath, "/debug"):
43+
prefix = "/debug"
44+
case strings.HasSuffix(basePath, "/debug/pprof"):
45+
prefix = "/debug/pprof"
46+
}
47+
48+
for _, r := range routers {
49+
router.Handle(r.Method, strings.TrimPrefix(r.Path, prefix), r.Handler)
50+
}
51+
}
52+
53+
// IndexHandler will pass the call from /debug/pprof to pprof
54+
func IndexHandler() gin.HandlerFunc {
55+
return func(ctx *gin.Context) {
56+
pprof.Index(ctx.Writer, ctx.Request)
57+
}
58+
}
59+
60+
// HeapHandler will pass the call from /debug/pprof/heap to pprof
61+
func HeapHandler() gin.HandlerFunc {
62+
return func(ctx *gin.Context) {
63+
pprof.Handler("heap").ServeHTTP(ctx.Writer, ctx.Request)
64+
}
65+
}
66+
67+
// GoroutineHandler will pass the call from /debug/pprof/goroutine to pprof
68+
func GoroutineHandler() gin.HandlerFunc {
69+
return func(ctx *gin.Context) {
70+
pprof.Handler("goroutine").ServeHTTP(ctx.Writer, ctx.Request)
71+
}
72+
}
73+
74+
// AllocsHandler will pass the call from /debug/pprof/allocs to pprof
75+
func AllocsHandler() gin.HandlerFunc {
76+
return func(ctx *gin.Context) {
77+
pprof.Handler("allocs").ServeHTTP(ctx.Writer, ctx.Request)
78+
}
79+
}
80+
81+
// BlockHandler will pass the call from /debug/pprof/block to pprof
82+
func BlockHandler() gin.HandlerFunc {
83+
return func(ctx *gin.Context) {
84+
pprof.Handler("block").ServeHTTP(ctx.Writer, ctx.Request)
85+
}
86+
}
87+
88+
// ThreadCreateHandler will pass the call from /debug/pprof/threadcreate to pprof
89+
func ThreadCreateHandler() gin.HandlerFunc {
90+
return func(ctx *gin.Context) {
91+
pprof.Handler("threadcreate").ServeHTTP(ctx.Writer, ctx.Request)
92+
}
93+
}
94+
95+
// CmdlineHandler will pass the call from /debug/pprof/cmdline to pprof
96+
func CmdlineHandler() gin.HandlerFunc {
97+
return func(ctx *gin.Context) {
98+
pprof.Cmdline(ctx.Writer, ctx.Request)
99+
}
100+
}
101+
102+
// ProfileHandler will pass the call from /debug/pprof/profile to pprof
103+
func ProfileHandler() gin.HandlerFunc {
104+
return func(ctx *gin.Context) {
105+
pprof.Profile(ctx.Writer, ctx.Request)
106+
}
107+
}
108+
109+
// SymbolHandler will pass the call from /debug/pprof/symbol to pprof
110+
func SymbolHandler() gin.HandlerFunc {
111+
return func(ctx *gin.Context) {
112+
pprof.Symbol(ctx.Writer, ctx.Request)
113+
}
114+
}
115+
116+
// TraceHandler will pass the call from /debug/pprof/trace to pprof
117+
func TraceHandler() gin.HandlerFunc {
118+
return func(ctx *gin.Context) {
119+
pprof.Trace(ctx.Writer, ctx.Request)
120+
}
121+
}
122+
123+
// MutexHandler will pass the call from /debug/pprof/mutex to pprof
124+
func MutexHandler() gin.HandlerFunc {
125+
return func(ctx *gin.Context) {
126+
pprof.Handler("mutex").ServeHTTP(ctx.Writer, ctx.Request)
127+
}
128+
}

server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ func (s *server) Init() {
6666
s.r.Use(gin.LoggerWithConfig(gin.LoggerConfig{}))
6767
}
6868

69+
WrapPProf(s.r)
70+
6971
regMetricFunc := setServerMetricHandlerAndMiddleware()
7072
regMetricFunc(s.r)
7173
}

test/integration/integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
// It will test add job, consume and remove.
1919
func TestDelayQueueAddAndRemove(t *testing.T) {
2020
// push n jobs with delay within 1 min
21-
DelayTimeSeconds := 60
21+
DelayTimeSeconds := 30
2222
Jobs := 200
2323
topic, key := "TestDelayQueueAddAndRemove-topic", "TestDelayQueueAddAndRemove-set"
2424
rand.Seed(time.Now().Unix())
@@ -64,7 +64,7 @@ func TestDelayQueueAddAndRemove(t *testing.T) {
6464

6565
// check after 1.5 min, all jobs should be done
6666
t.Log("Sleeping")
67-
time.Sleep(90 * time.Second)
67+
time.Sleep(50 * time.Second)
6868

6969
num, err := RecordNumbers(key)
7070
require.NoError(t, err)

timer/timer.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
// TaskFunc only task function can be added to
1212
// the timer.
13-
type TaskFunc func() (bool, error)
13+
type TaskFunc func() (hasMore bool, err error)
1414

1515
// Timer is for processing task. it checks buckets
1616
// for popping jobs. it will put ready jobs to queue.
@@ -26,7 +26,8 @@ type timer struct {
2626
tasks []taskStub // task stub
2727
once sync.Once // once
2828
l log.Logger // logger
29-
taskInterval time.Duration
29+
taskInterval time.Duration // fetch interval
30+
taskDelay time.Duration // fetch delay when bucket has more jobs after a fetching. Default no wait.
3031
}
3132

3233
// taskStub task stub for function itself and context,
@@ -38,11 +39,12 @@ type taskStub struct {
3839
l log.Logger
3940
}
4041

41-
func New(l log.Logger, taskInterval time.Duration) Timer {
42+
func New(l log.Logger, taskInterval, taskDelay time.Duration) Timer {
4243
return &timer{
4344
wg: sync.WaitGroup{},
4445
l: l.WithModule("timer"),
4546
taskInterval: taskInterval,
47+
taskDelay: taskDelay,
4648
}
4749
}
4850

@@ -64,7 +66,7 @@ func (t *timer) Run() {
6466
for _, task := range t.tasks {
6567
go func(task taskStub) {
6668
defer t.wg.Done()
67-
task.run(t.taskInterval)
69+
task.run(t.taskInterval, t.taskDelay)
6870
}(task)
6971
}
7072

@@ -84,7 +86,7 @@ func (t *timer) Close() {
8486

8587
// run a task, and wait for context is done.
8688
// this can be implement with more thinking.
87-
func (task taskStub) run(fetchInterval time.Duration) {
89+
func (task taskStub) run(fetchInterval, fetchDelay time.Duration) {
8890
for {
8991
select {
9092
case <-task.ctx.Done():
@@ -95,10 +97,13 @@ func (task taskStub) run(fetchInterval time.Duration) {
9597
task.l.Error("task run failed", log.String("err", err.Error()))
9698
time.Sleep(fetchInterval)
9799
continue
98-
} else if hasMore {
100+
} else if !hasMore {
99101
time.Sleep(fetchInterval)
100102
continue
101103
}
104+
105+
// have more jobs, wait delay time to fetch next time
106+
time.Sleep(fetchDelay)
102107
}
103108
}
104109
}

0 commit comments

Comments
 (0)