@@ -28,12 +28,10 @@ import (
28
28
"errors"
29
29
"fmt"
30
30
"os"
31
- "runtime"
32
31
"sync"
33
32
"syscall"
34
33
"time"
35
34
36
- "github.com/shirou/gopsutil/cpu"
37
35
"github.com/uber-go/tally"
38
36
"go.uber.org/zap"
39
37
"go.uber.org/zap/zapcore"
@@ -140,10 +138,11 @@ type (
140
138
logger * zap.Logger
141
139
metricsScope tally.Scope
142
140
143
- pollerRequestCh chan struct {}
144
- pollerAutoScaler * pollerAutoScaler
145
- taskQueueCh chan interface {}
146
- sessionTokenBucket * sessionTokenBucket
141
+ pollerRequestCh chan struct {}
142
+ pollerAutoScaler * pollerAutoScaler
143
+ workerUsageCollector * workerUsageCollector
144
+ taskQueueCh chan interface {}
145
+ sessionTokenBucket * sessionTokenBucket
147
146
}
148
147
149
148
polledTask struct {
@@ -173,17 +172,29 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
173
172
logger ,
174
173
)
175
174
}
175
+ // for now it's default to be enabled
176
+ var workerUC * workerUsageCollector
177
+ workerUC = newWorkerUsageCollector (
178
+ workerUsageCollectorOptions {
179
+ Enabled : true ,
180
+ Cooldown : 30 * time .Second ,
181
+ Host : options .host ,
182
+ MetricsScope : metricsScope ,
183
+ },
184
+ logger ,
185
+ )
176
186
177
187
bw := & baseWorker {
178
- options : options ,
179
- shutdownCh : make (chan struct {}),
180
- taskLimiter : rate .NewLimiter (rate .Limit (options .maxTaskPerSecond ), 1 ),
181
- retrier : backoff .NewConcurrentRetrier (pollOperationRetryPolicy ),
182
- logger : logger .With (zapcore.Field {Key : tagWorkerType , Type : zapcore .StringType , String : options .workerType }),
183
- metricsScope : tagScope (metricsScope , tagWorkerType , options .workerType ),
184
- pollerRequestCh : make (chan struct {}, options .maxConcurrentTask ),
185
- pollerAutoScaler : pollerAS ,
186
- taskQueueCh : make (chan interface {}), // no buffer, so poller only able to poll new task after previous is dispatched.
188
+ options : options ,
189
+ shutdownCh : make (chan struct {}),
190
+ taskLimiter : rate .NewLimiter (rate .Limit (options .maxTaskPerSecond ), 1 ),
191
+ retrier : backoff .NewConcurrentRetrier (pollOperationRetryPolicy ),
192
+ logger : logger .With (zapcore.Field {Key : tagWorkerType , Type : zapcore .StringType , String : options .workerType }),
193
+ metricsScope : tagScope (metricsScope , tagWorkerType , options .workerType ),
194
+ pollerRequestCh : make (chan struct {}, options .maxConcurrentTask ),
195
+ pollerAutoScaler : pollerAS ,
196
+ workerUsageCollector : workerUC ,
197
+ taskQueueCh : make (chan interface {}), // no buffer, so poller only able to poll new task after previous is dispatched.
187
198
188
199
limiterContext : ctx ,
189
200
limiterContextCancel : cancel ,
@@ -207,6 +218,10 @@ func (bw *baseWorker) Start() {
207
218
bw .pollerAutoScaler .Start ()
208
219
}
209
220
221
+ if bw .workerUsageCollector != nil {
222
+ bw .workerUsageCollector .Start ()
223
+ }
224
+
210
225
for i := 0 ; i < bw .options .pollerCount ; i ++ {
211
226
bw .shutdownWG .Add (1 )
212
227
go bw .runPoller ()
@@ -215,11 +230,6 @@ func (bw *baseWorker) Start() {
215
230
bw .shutdownWG .Add (1 )
216
231
go bw .runTaskDispatcher ()
217
232
218
- // We want the emit function run once per host instead of run once per worker
219
- // since the emit function is host level metric.
220
- bw .shutdownWG .Add (1 )
221
- go bw .emitHardwareUsage ()
222
-
223
233
bw .isWorkerStarted = true
224
234
traceLog (func () {
225
235
bw .logger .Info ("Started Worker" ,
@@ -401,6 +411,9 @@ func (bw *baseWorker) Stop() {
401
411
if bw .pollerAutoScaler != nil {
402
412
bw .pollerAutoScaler .Stop ()
403
413
}
414
+ if bw .workerUsageCollector != nil {
415
+ bw .workerUsageCollector .Stop ()
416
+ }
404
417
405
418
if success := util .AwaitWaitGroup (& bw .shutdownWG , bw .options .shutdownTimeout ); ! success {
406
419
traceLog (func () {
@@ -414,53 +427,3 @@ func (bw *baseWorker) Stop() {
414
427
}
415
428
return
416
429
}
417
-
418
- func (bw * baseWorker ) emitHardwareUsage () {
419
- defer func () {
420
- if p := recover (); p != nil {
421
- bw .metricsScope .Counter (metrics .WorkerPanicCounter ).Inc (1 )
422
- topLine := fmt .Sprintf ("base worker for %s [panic]:" , bw .options .workerType )
423
- st := getStackTraceRaw (topLine , 7 , 0 )
424
- bw .logger .Error ("Unhandled panic in hardware emitting." ,
425
- zap .String (tagPanicError , fmt .Sprintf ("%v" , p )),
426
- zap .String (tagPanicStack , st ))
427
- }
428
- }()
429
- defer bw .shutdownWG .Done ()
430
- collectHardwareUsageOnce .Do (
431
- func () {
432
- ticker := time .NewTicker (hardwareMetricsCollectInterval )
433
- for {
434
- select {
435
- case <- bw .shutdownCh :
436
- ticker .Stop ()
437
- return
438
- case <- ticker .C :
439
- host := bw .options .host
440
- scope := bw .metricsScope .Tagged (map [string ]string {clientHostTag : host })
441
-
442
- cpuPercent , err := cpu .Percent (0 , false )
443
- if err != nil {
444
- bw .logger .Warn ("Failed to get cpu percent" , zap .Error (err ))
445
- return
446
- }
447
- cpuCores , err := cpu .Counts (false )
448
- if err != nil {
449
- bw .logger .Warn ("Failed to get number of cpu cores" , zap .Error (err ))
450
- return
451
- }
452
- scope .Gauge (metrics .NumCPUCores ).Update (float64 (cpuCores ))
453
- scope .Gauge (metrics .CPUPercentage ).Update (cpuPercent [0 ])
454
-
455
- var memStats runtime.MemStats
456
- runtime .ReadMemStats (& memStats )
457
-
458
- scope .Gauge (metrics .NumGoRoutines ).Update (float64 (runtime .NumGoroutine ()))
459
- scope .Gauge (metrics .TotalMemory ).Update (float64 (memStats .Sys ))
460
- scope .Gauge (metrics .MemoryUsedHeap ).Update (float64 (memStats .HeapInuse ))
461
- scope .Gauge (metrics .MemoryUsedStack ).Update (float64 (memStats .StackInuse ))
462
- }
463
- }
464
- })
465
-
466
- }
0 commit comments