From 707a76763eb2098ad39129d346c0e91d4c032dc4 Mon Sep 17 00:00:00 2001 From: Darya Melentsova <ifireice@gmail.com> Date: Sun, 30 Apr 2023 16:40:36 +0300 Subject: [PATCH 1/6] graphite: refactor error handling Signed-off-by: ifireice <ifireice@gmail.com> Signed-off-by: Darya Melentsova <ifireice@gmail.com> --- prometheus/graphite/bridge.go | 62 ++++++++---------------------- prometheus/graphite/bridge_test.go | 46 +++++++++++++++++----- 2 files changed, 52 insertions(+), 56 deletions(-) diff --git a/prometheus/graphite/bridge.go b/prometheus/graphite/bridge.go index 84eac0de9..b49314a53 100644 --- a/prometheus/graphite/bridge.go +++ b/prometheus/graphite/bridge.go @@ -38,19 +38,11 @@ const ( millisecondsPerSecond = 1000 ) -// HandlerErrorHandling defines how a Handler serving metrics will handle -// errors. -type HandlerErrorHandling int +// ErrorHandler is a function that handles errors +type ErrorHandler func(err error) -// These constants cause handlers serving metrics to behave as described if -// errors are encountered. -const ( - // Ignore errors and try to push as many metrics to Graphite as possible. - ContinueOnError HandlerErrorHandling = iota - - // Abort the push to Graphite upon the first error encountered. - AbortOnError -) +// DefaultErrorHandler skips received errors +var DefaultErrorHandler = func(err error) {} // Config defines the Graphite bridge config. type Config struct { @@ -72,13 +64,8 @@ type Config struct { // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer. Gatherer prometheus.Gatherer - // The logger that messages are written to. Defaults to no logging. - Logger Logger - - // ErrorHandling defines how errors are handled. Note that errors are - // logged regardless of the configured ErrorHandling provided Logger - // is not nil. - ErrorHandling HandlerErrorHandling + // ErrorHandler defines how errors are handled. + ErrorHandler ErrorHandler } // Bridge pushes metrics to the configured Graphite server. @@ -89,19 +76,11 @@ type Bridge struct { interval time.Duration timeout time.Duration - errorHandling HandlerErrorHandling - logger Logger + errorHandler ErrorHandler g prometheus.Gatherer } -// Logger is the minimal interface Bridge needs for logging. Note that -// log.Logger from the standard library implements this interface, and it is -// easy to implement by custom loggers, if they don't do so already anyway. -type Logger interface { - Println(v ...interface{}) -} - // NewBridge returns a pointer to a new Bridge struct. func NewBridge(c *Config) (*Bridge, error) { b := &Bridge{} @@ -119,10 +98,6 @@ func NewBridge(c *Config) (*Bridge, error) { b.g = c.Gatherer } - if c.Logger != nil { - b.logger = c.Logger - } - if c.Prefix != "" { b.prefix = c.Prefix } @@ -140,7 +115,7 @@ func NewBridge(c *Config) (*Bridge, error) { b.timeout = c.Timeout } - b.errorHandling = c.ErrorHandling + b.errorHandler = c.ErrorHandler return b, nil } @@ -153,9 +128,7 @@ func (b *Bridge) Run(ctx context.Context) { for { select { case <-ticker.C: - if err := b.Push(); err != nil && b.logger != nil { - b.logger.Println("error pushing to Graphite:", err) - } + b.errorHandler(b.Push()) case <-ctx.Done(): return } @@ -165,17 +138,12 @@ func (b *Bridge) Run(ctx context.Context) { // Push pushes Prometheus metrics to the configured Graphite server. func (b *Bridge) Push() error { mfs, err := b.g.Gather() - if err != nil || len(mfs) == 0 { - switch b.errorHandling { - case AbortOnError: - return err - case ContinueOnError: - if b.logger != nil { - b.logger.Println("continue on error:", err) - } - default: - panic("unrecognized error handling value") - } + if err != nil { + return err + } + + if len(mfs) == 0 { + return nil } conn, err := net.DialTimeout("tcp", b.url, b.timeout) diff --git a/prometheus/graphite/bridge_test.go b/prometheus/graphite/bridge_test.go index df0cfff3f..4dc3a204a 100644 --- a/prometheus/graphite/bridge_test.go +++ b/prometheus/graphite/bridge_test.go @@ -19,9 +19,7 @@ import ( "context" "fmt" "io" - "log" "net" - "os" "reflect" "regexp" "sort" @@ -438,13 +436,12 @@ type mockGraphite struct { func ExampleBridge() { b, err := NewBridge(&Config{ - URL: "graphite.example.org:3099", - Gatherer: prometheus.DefaultGatherer, - Prefix: "prefix", - Interval: 15 * time.Second, - Timeout: 10 * time.Second, - ErrorHandling: AbortOnError, - Logger: log.New(os.Stdout, "graphite bridge: ", log.Lshortfile), + URL: "graphite.example.org:3099", + Gatherer: prometheus.DefaultGatherer, + Prefix: "prefix", + Interval: 15 * time.Second, + Timeout: 10 * time.Second, + ErrorHandler: func(err error) {}, }) if err != nil { panic(err) @@ -467,3 +464,34 @@ func ExampleBridge() { // Start pushing metrics to Graphite in the Run() loop. b.Run(ctx) } + +func TestErrorHandler(t *testing.T) { + var internalError error + c := &Config{ + URL: "graphite.example.org:3099", + Gatherer: prometheus.DefaultGatherer, + Prefix: "prefix", + Interval: 5 * time.Second, + Timeout: 2 * time.Second, + ErrorHandler: func(err error) { internalError = err }, + } + b, err := NewBridge(c) + if err != nil { + panic(err) + } + + // Create a Context to control stopping the Run() loop that pushes + // metrics to Graphite. Multiplied by 2, because we need Run to be executed at least one time. + ctx, cancel := context.WithTimeout(context.Background(), c.Interval*2) + defer cancel() + + // Start pushing metrics to Graphite in the Run() loop. + b.Run(ctx) + + // There are obviously no hosts like "graphite.example.com" available during the tests. + expError := fmt.Errorf("dial tcp: lookup graphite.example.org: no such host") + + if internalError.Error() != expError.Error() { + t.Fatalf("Expected: '%s', actual: '%s'", expError, internalError) + } +} From c18237f5f6ba53b2ec8471ee61471713e69b77ef Mon Sep 17 00:00:00 2001 From: Darya Melentsova <ifireice@gmail.com> Date: Sun, 30 Apr 2023 17:10:45 +0300 Subject: [PATCH 2/6] remove line Signed-off-by: Darya Melentsova <ifireice@gmail.com> --- prometheus/graphite/bridge_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/prometheus/graphite/bridge_test.go b/prometheus/graphite/bridge_test.go index 4dc3a204a..63c09b1b3 100644 --- a/prometheus/graphite/bridge_test.go +++ b/prometheus/graphite/bridge_test.go @@ -490,7 +490,6 @@ func TestErrorHandler(t *testing.T) { // There are obviously no hosts like "graphite.example.com" available during the tests. expError := fmt.Errorf("dial tcp: lookup graphite.example.org: no such host") - if internalError.Error() != expError.Error() { t.Fatalf("Expected: '%s', actual: '%s'", expError, internalError) } From b64fe48b5fe98205c23ad6fe31f207b8c00a65fc Mon Sep 17 00:00:00 2001 From: Darya Melentsova <ifireice@gmail.com> Date: Mon, 1 May 2023 12:50:35 +0300 Subject: [PATCH 3/6] fix test Signed-off-by: Darya Melentsova <ifireice@gmail.com> --- prometheus/graphite/bridge_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prometheus/graphite/bridge_test.go b/prometheus/graphite/bridge_test.go index 63c09b1b3..1ab930af3 100644 --- a/prometheus/graphite/bridge_test.go +++ b/prometheus/graphite/bridge_test.go @@ -468,7 +468,7 @@ func ExampleBridge() { func TestErrorHandler(t *testing.T) { var internalError error c := &Config{ - URL: "graphite.example.org:3099", + URL: "", Gatherer: prometheus.DefaultGatherer, Prefix: "prefix", Interval: 5 * time.Second, @@ -488,8 +488,8 @@ func TestErrorHandler(t *testing.T) { // Start pushing metrics to Graphite in the Run() loop. b.Run(ctx) - // There are obviously no hosts like "graphite.example.com" available during the tests. - expError := fmt.Errorf("dial tcp: lookup graphite.example.org: no such host") + // We haven't specified tcp address + expError := fmt.Errorf("dial tcp: missing address") if internalError.Error() != expError.Error() { t.Fatalf("Expected: '%s', actual: '%s'", expError, internalError) } From a7ff8521551bce1a7b2a2fc1a56c12ab77993eb6 Mon Sep 17 00:00:00 2001 From: Darya Melentsova <ifireice@gmail.com> Date: Mon, 1 May 2023 13:04:39 +0300 Subject: [PATCH 4/6] fix test Signed-off-by: Darya Melentsova <ifireice@gmail.com> --- prometheus/graphite/bridge_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prometheus/graphite/bridge_test.go b/prometheus/graphite/bridge_test.go index 1ab930af3..62ee8e2c9 100644 --- a/prometheus/graphite/bridge_test.go +++ b/prometheus/graphite/bridge_test.go @@ -468,7 +468,7 @@ func ExampleBridge() { func TestErrorHandler(t *testing.T) { var internalError error c := &Config{ - URL: "", + URL: "localhost", Gatherer: prometheus.DefaultGatherer, Prefix: "prefix", Interval: 5 * time.Second, @@ -488,8 +488,8 @@ func TestErrorHandler(t *testing.T) { // Start pushing metrics to Graphite in the Run() loop. b.Run(ctx) - // We haven't specified tcp address - expError := fmt.Errorf("dial tcp: missing address") + // We haven't specified port + expError := fmt.Errorf("dial tcp: address localhost: missing port in address") if internalError.Error() != expError.Error() { t.Fatalf("Expected: '%s', actual: '%s'", expError, internalError) } From 8bc96219a08814bbeba0a3b8dcb9986d3dccd359 Mon Sep 17 00:00:00 2001 From: Darya Melentsova <ifireice@gmail.com> Date: Sun, 14 May 2023 18:07:07 +0300 Subject: [PATCH 5/6] added support backward compatibility Signed-off-by: Darya Melentsova <ifireice@gmail.com> --- prometheus/graphite/bridge.go | 77 +++++++++++++++++++++++++---- prometheus/graphite/bridge_test.go | 78 ++++++++++++++++++------------ 2 files changed, 115 insertions(+), 40 deletions(-) diff --git a/prometheus/graphite/bridge.go b/prometheus/graphite/bridge.go index b49314a53..5d7d35cde 100644 --- a/prometheus/graphite/bridge.go +++ b/prometheus/graphite/bridge.go @@ -38,11 +38,22 @@ const ( millisecondsPerSecond = 1000 ) -// ErrorHandler is a function that handles errors -type ErrorHandler func(err error) +// HandlerErrorHandling defines how a Handler serving metrics will handle +// errors. +type HandlerErrorHandling int -// DefaultErrorHandler skips received errors -var DefaultErrorHandler = func(err error) {} +// These constants cause handlers serving metrics to behave as described if +// errors are encountered. +const ( + // Ignore errors and try to push as many metrics to Graphite as possible. + ContinueOnError HandlerErrorHandling = iota + + // Abort the push to Graphite upon the first error encountered. + AbortOnError + + // Execute callback function on error. + CallbackOnError +) // Config defines the Graphite bridge config. type Config struct { @@ -64,8 +75,16 @@ type Config struct { // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer. Gatherer prometheus.Gatherer - // ErrorHandler defines how errors are handled. - ErrorHandler ErrorHandler + // The logger that messages are written to. Defaults to no logging. + Logger Logger + + // ErrorHandling defines how errors are handled. Note that errors are + // logged regardless of the configured ErrorHandling provided Logger + // is not nil. + ErrorHandling HandlerErrorHandling + + // ErrorCallbackFunc is a callback function that can be executed when error is occurred + ErrorCallbackFunc CallbackFunc } // Bridge pushes metrics to the configured Graphite server. @@ -76,11 +95,23 @@ type Bridge struct { interval time.Duration timeout time.Duration - errorHandler ErrorHandler + errorHandling HandlerErrorHandling + errorCallbackFunc CallbackFunc + logger Logger g prometheus.Gatherer } +// Logger is the minimal interface Bridge needs for logging. Note that +// log.Logger from the standard library implements this interface, and it is +// easy to implement by custom loggers, if they don't do so already anyway. +type Logger interface { + Println(v ...interface{}) +} + +// CallbackFunc is a special type for callback functions +type CallbackFunc func(error) + // NewBridge returns a pointer to a new Bridge struct. func NewBridge(c *Config) (*Bridge, error) { b := &Bridge{} @@ -98,6 +129,10 @@ func NewBridge(c *Config) (*Bridge, error) { b.g = c.Gatherer } + if c.Logger != nil { + b.logger = c.Logger + } + if c.Prefix != "" { b.prefix = c.Prefix } @@ -115,7 +150,11 @@ func NewBridge(c *Config) (*Bridge, error) { b.timeout = c.Timeout } - b.errorHandler = c.ErrorHandler + b.errorHandling = c.ErrorHandling + + if c.ErrorCallbackFunc != nil { + b.errorCallbackFunc = c.ErrorCallbackFunc + } return b, nil } @@ -128,7 +167,9 @@ func (b *Bridge) Run(ctx context.Context) { for { select { case <-ticker.C: - b.errorHandler(b.Push()) + if err := b.Push(); err != nil && b.logger != nil { + b.logger.Println("error pushing to Graphite:", err) + } case <-ctx.Done(): return } @@ -137,11 +178,27 @@ func (b *Bridge) Run(ctx context.Context) { // Push pushes Prometheus metrics to the configured Graphite server. func (b *Bridge) Push() error { + err := b.push() + switch b.errorHandling { + case AbortOnError: + return err + case ContinueOnError: + if b.logger != nil { + b.logger.Println("continue on error:", err) + } + case CallbackOnError: + if b.errorCallbackFunc != nil { + b.errorCallbackFunc(err) + } + } + return nil +} + +func (b *Bridge) push() error { mfs, err := b.g.Gather() if err != nil { return err } - if len(mfs) == 0 { return nil } diff --git a/prometheus/graphite/bridge_test.go b/prometheus/graphite/bridge_test.go index 62ee8e2c9..9c83cd4ee 100644 --- a/prometheus/graphite/bridge_test.go +++ b/prometheus/graphite/bridge_test.go @@ -19,7 +19,9 @@ import ( "context" "fmt" "io" + "log" "net" + "os" "reflect" "regexp" "sort" @@ -436,12 +438,13 @@ type mockGraphite struct { func ExampleBridge() { b, err := NewBridge(&Config{ - URL: "graphite.example.org:3099", - Gatherer: prometheus.DefaultGatherer, - Prefix: "prefix", - Interval: 15 * time.Second, - Timeout: 10 * time.Second, - ErrorHandler: func(err error) {}, + URL: "graphite.example.org:3099", + Gatherer: prometheus.DefaultGatherer, + Prefix: "prefix", + Interval: 15 * time.Second, + Timeout: 10 * time.Second, + ErrorHandling: AbortOnError, + Logger: log.New(os.Stdout, "graphite bridge: ", log.Lshortfile), }) if err != nil { panic(err) @@ -465,32 +468,47 @@ func ExampleBridge() { b.Run(ctx) } -func TestErrorHandler(t *testing.T) { - var internalError error - c := &Config{ - URL: "localhost", - Gatherer: prometheus.DefaultGatherer, - Prefix: "prefix", - Interval: 5 * time.Second, - Timeout: 2 * time.Second, - ErrorHandler: func(err error) { internalError = err }, - } - b, err := NewBridge(c) - if err != nil { - panic(err) +func TestErrorHandling(t *testing.T) { + var testCases = []struct { + errorHandling HandlerErrorHandling + receivedError error + interceptedError error + }{ + { + errorHandling: ContinueOnError, + receivedError: nil, + interceptedError: nil, + }, + { + errorHandling: AbortOnError, + receivedError: &net.OpError{}, + interceptedError: nil, + }, + { + errorHandling: CallbackOnError, + receivedError: nil, + interceptedError: &net.OpError{}, + }, } - // Create a Context to control stopping the Run() loop that pushes - // metrics to Graphite. Multiplied by 2, because we need Run to be executed at least one time. - ctx, cancel := context.WithTimeout(context.Background(), c.Interval*2) - defer cancel() - - // Start pushing metrics to Graphite in the Run() loop. - b.Run(ctx) + for _, testCase := range testCases { + var interceptedError error + c := &Config{ + URL: "localhost", + ErrorHandling: testCase.errorHandling, + ErrorCallbackFunc: func(err error) { interceptedError = err }, + } + b, err := NewBridge(c) + if err != nil { + t.Fatal(err) + } - // We haven't specified port - expError := fmt.Errorf("dial tcp: address localhost: missing port in address") - if internalError.Error() != expError.Error() { - t.Fatalf("Expected: '%s', actual: '%s'", expError, internalError) + receivedError := b.Push() + if reflect.TypeOf(receivedError) != reflect.TypeOf(testCase.receivedError) { + t.Errorf("expected to receive: %T, received: %T", testCase.receivedError, receivedError) + } + if reflect.TypeOf(interceptedError) != reflect.TypeOf(testCase.interceptedError) { + t.Errorf("expected to intercept: %T, intercepted: %T", testCase.interceptedError, interceptedError) + } } } From b801f06e316ad12a9f3c29f6b2535ff44f8bcb9f Mon Sep 17 00:00:00 2001 From: Darya Melentsova <ifireice@gmail.com> Date: Wed, 21 Jun 2023 08:49:59 +0300 Subject: [PATCH 6/6] fix ErrorCallbackFunc Signed-off-by: Darya Melentsova <ifireice@gmail.com> --- examples/exemplars/main.go | 48 ++++++++++++++++++++++++++++++ prometheus/graphite/bridge.go | 18 +++++------ prometheus/graphite/bridge_test.go | 7 +---- 3 files changed, 56 insertions(+), 17 deletions(-) diff --git a/examples/exemplars/main.go b/examples/exemplars/main.go index 798c2dfd0..b5fd3de85 100644 --- a/examples/exemplars/main.go +++ b/examples/exemplars/main.go @@ -14,6 +14,54 @@ // A simple example of how to record a latency metric with exemplars, using a fictional id // as a prometheus label. +package main + +import ( + "context" + "fmt" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "log" + "math/rand" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/graphite" + "go.uber.org/zap" +) + +func main() { + logger, _ := zap.NewProduction() + defer logger.Sync() + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c := &Config{ + URL: "graphite.example.org:3099", + Gatherer: prometheus.DefaultGatherer, + Prefix: "prefix", + Interval: 5 * time.Second, + Timeout: 2 * time.Second, + ErrorHandling: testCase.errorHandling, + ErrorCallbackFunc: func(err error) { if err != nil { logger.Error("run", zap.Error(err)); cancel() } }, + } + + + b, err := graphite.NewBridge(c) + if err != nil { + t.Fatal(err) + } + + b.Run(ctx) +} + + + + + package main import ( diff --git a/prometheus/graphite/bridge.go b/prometheus/graphite/bridge.go index 5d7d35cde..f0693c61d 100644 --- a/prometheus/graphite/bridge.go +++ b/prometheus/graphite/bridge.go @@ -50,9 +50,6 @@ const ( // Abort the push to Graphite upon the first error encountered. AbortOnError - - // Execute callback function on error. - CallbackOnError ) // Config defines the Graphite bridge config. @@ -84,7 +81,7 @@ type Config struct { ErrorHandling HandlerErrorHandling // ErrorCallbackFunc is a callback function that can be executed when error is occurred - ErrorCallbackFunc CallbackFunc + ErrorCallbackFunc ErrorCallbackFunc } // Bridge pushes metrics to the configured Graphite server. @@ -96,7 +93,7 @@ type Bridge struct { timeout time.Duration errorHandling HandlerErrorHandling - errorCallbackFunc CallbackFunc + errorCallbackFunc ErrorCallbackFunc logger Logger g prometheus.Gatherer @@ -109,8 +106,8 @@ type Logger interface { Println(v ...interface{}) } -// CallbackFunc is a special type for callback functions -type CallbackFunc func(error) +// ErrorCallbackFunc is a special type for callback functions +type ErrorCallbackFunc func(error) // NewBridge returns a pointer to a new Bridge struct. func NewBridge(c *Config) (*Bridge, error) { @@ -179,6 +176,9 @@ func (b *Bridge) Run(ctx context.Context) { // Push pushes Prometheus metrics to the configured Graphite server. func (b *Bridge) Push() error { err := b.push() + if b.errorCallbackFunc != nil { + b.errorCallbackFunc(err) + } switch b.errorHandling { case AbortOnError: return err @@ -186,10 +186,6 @@ func (b *Bridge) Push() error { if b.logger != nil { b.logger.Println("continue on error:", err) } - case CallbackOnError: - if b.errorCallbackFunc != nil { - b.errorCallbackFunc(err) - } } return nil } diff --git a/prometheus/graphite/bridge_test.go b/prometheus/graphite/bridge_test.go index 9c83cd4ee..01f0079eb 100644 --- a/prometheus/graphite/bridge_test.go +++ b/prometheus/graphite/bridge_test.go @@ -477,16 +477,11 @@ func TestErrorHandling(t *testing.T) { { errorHandling: ContinueOnError, receivedError: nil, - interceptedError: nil, + interceptedError: &net.OpError{}, }, { errorHandling: AbortOnError, receivedError: &net.OpError{}, - interceptedError: nil, - }, - { - errorHandling: CallbackOnError, - receivedError: nil, interceptedError: &net.OpError{}, }, }