Skip to content

Commit 3a395fc

Browse files
authored
Prometheus metrics infrastructure and simple metrics in router (#249)
Signed-off-by: Genady Gurevich <[email protected]>
1 parent 50f437b commit 3a395fc

File tree

311 files changed

+50017
-744
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

311 files changed

+50017
-744
lines changed

common/monitoring/monitor.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"net"
13+
14+
"github.com/hyperledger/fabric-lib-go/common/flogging"
15+
"github.com/hyperledger/fabric-x-orderer/common/types"
16+
)
17+
18+
type Monitor struct {
19+
Provider *Provider
20+
logger types.Logger
21+
endpoint Endpoint
22+
// stop is used to stop the monitoring service
23+
stop context.CancelFunc
24+
listener net.Listener
25+
}
26+
27+
func NewMonitor(endpoint Endpoint, prefix string) *Monitor {
28+
logger := flogging.MustGetLogger(fmt.Sprintf("%s.monitoring", prefix))
29+
return &Monitor{Provider: NewProvider(logger), endpoint: endpoint, logger: logger}
30+
}
31+
32+
func (m *Monitor) Start() {
33+
ctx, cancel := context.WithCancel(context.Background())
34+
m.stop = cancel
35+
36+
var err error
37+
serverConfig := ServerConfig{endpoint: &m.endpoint, logger: m.logger}
38+
m.listener, err = serverConfig.Listener()
39+
if err != nil {
40+
m.logger.Panicf("%v", err)
41+
}
42+
m.endpoint.Port = serverConfig.endpoint.Port
43+
44+
go func() {
45+
m.Provider.StartPrometheusServer(ctx, m.listener)
46+
}()
47+
}
48+
49+
func (m *Monitor) Stop() {
50+
if m.stop != nil {
51+
m.stop()
52+
}
53+
if m.listener != nil {
54+
m.listener.Close()
55+
}
56+
}
57+
58+
func (m *Monitor) Address() string {
59+
return fmt.Sprintf("http://%s/metrics", m.endpoint.Address())
60+
}

common/monitoring/provider.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"net"
12+
"net/http"
13+
"net/url"
14+
"time"
15+
16+
"github.com/hyperledger/fabric-lib-go/common/metrics"
17+
"github.com/hyperledger/fabric-x-orderer/common/types"
18+
"github.com/pkg/errors"
19+
"github.com/prometheus/client_golang/prometheus"
20+
"github.com/prometheus/client_golang/prometheus/promhttp"
21+
promgo "github.com/prometheus/client_model/go"
22+
"golang.org/x/sync/errgroup"
23+
)
24+
25+
const (
26+
scheme = "http://"
27+
metricsSubPath = "/metrics"
28+
)
29+
30+
// Provider is a prometheus metrics provider.
31+
type Provider struct {
32+
logger types.Logger
33+
registry *prometheus.Registry
34+
url string
35+
}
36+
37+
// NewProvider creates a new prometheus metrics provider.
38+
func NewProvider(logger types.Logger) *Provider {
39+
return &Provider{logger: logger, registry: prometheus.NewRegistry()}
40+
}
41+
42+
// StartPrometheusServer starts a prometheus server.
43+
// It also starts the given monitoring methods. Their context will cancel once the server is cancelled.
44+
// This method returns once the server is shutdown and all monitoring methods returns.
45+
func (p *Provider) StartPrometheusServer(
46+
ctx context.Context, listener net.Listener, monitor ...func(context.Context),
47+
) error {
48+
p.logger.Debugf("Creating prometheus server")
49+
mux := http.NewServeMux()
50+
mux.Handle(
51+
metricsSubPath,
52+
promhttp.HandlerFor(
53+
p.Registry(),
54+
promhttp.HandlerOpts{
55+
Registry: p.Registry(),
56+
},
57+
),
58+
)
59+
server := &http.Server{
60+
ReadTimeout: 30 * time.Second,
61+
Handler: mux,
62+
}
63+
64+
var err error
65+
p.url, err = MakeMetricsURL(listener.Addr().String())
66+
if err != nil {
67+
return errors.Wrap(err, "failed formatting URL")
68+
}
69+
70+
g, gCtx := errgroup.WithContext(ctx)
71+
g.Go(func() error {
72+
p.logger.Infof("Prometheus serving on URL: %s", p.url)
73+
defer p.logger.Infof("Prometheus stopped serving")
74+
return server.Serve(listener)
75+
})
76+
77+
// The following ensures the method does not return before all monitor methods return.
78+
for _, m := range monitor {
79+
g.Go(func() error {
80+
m(gCtx)
81+
return nil
82+
})
83+
}
84+
85+
// The following ensures the method does not return before the close procedure is complete.
86+
stopAfter := context.AfterFunc(ctx, func() {
87+
go func() error {
88+
if errClose := server.Close(); errClose != nil {
89+
return errors.Wrap(errClose, "failed to close prometheus server")
90+
}
91+
return nil
92+
}()
93+
})
94+
defer stopAfter()
95+
96+
if err = g.Wait(); !errors.Is(err, http.ErrServerClosed) {
97+
return errors.Wrap(err, "prometheus server stopped with an error")
98+
}
99+
return nil
100+
}
101+
102+
// URL returns the prometheus server URL.
103+
func (p *Provider) URL() string {
104+
return p.url
105+
}
106+
107+
// MakeMetricsURL construct the Prometheus metrics URL.
108+
func MakeMetricsURL(address string) (string, error) {
109+
return url.JoinPath(scheme, address, metricsSubPath)
110+
}
111+
112+
func (p *Provider) NewCounter(o metrics.CounterOpts) metrics.Counter {
113+
c := &Counter{
114+
cv: prometheus.NewCounterVec(
115+
prometheus.CounterOpts{
116+
Namespace: o.Namespace,
117+
Subsystem: o.Subsystem,
118+
Name: o.Name,
119+
Help: o.Help,
120+
},
121+
o.LabelNames,
122+
),
123+
}
124+
125+
p.registry.MustRegister(c.cv)
126+
return c
127+
}
128+
129+
func (p *Provider) NewGauge(o metrics.GaugeOpts) metrics.Gauge {
130+
g := &Gauge{
131+
gv: prometheus.NewGaugeVec(
132+
prometheus.GaugeOpts{
133+
Namespace: o.Namespace,
134+
Subsystem: o.Subsystem,
135+
Name: o.Name,
136+
Help: o.Help,
137+
},
138+
o.LabelNames,
139+
),
140+
}
141+
142+
p.registry.MustRegister(g.gv)
143+
return g
144+
}
145+
146+
func (p *Provider) NewHistogram(o metrics.HistogramOpts) metrics.Histogram {
147+
h := &Histogram{
148+
hv: prometheus.NewHistogramVec(
149+
prometheus.HistogramOpts{
150+
Namespace: o.Namespace,
151+
Subsystem: o.Subsystem,
152+
Name: o.Name,
153+
Help: o.Help,
154+
Buckets: o.Buckets,
155+
},
156+
o.LabelNames,
157+
),
158+
}
159+
160+
p.registry.MustRegister(h.hv)
161+
return h
162+
}
163+
164+
type Counter struct {
165+
prometheus.Counter
166+
cv *prometheus.CounterVec
167+
}
168+
169+
func (c *Counter) With(labelValues ...string) metrics.Counter {
170+
return &Counter{Counter: c.cv.WithLabelValues(labelValues...)}
171+
}
172+
173+
type Gauge struct {
174+
prometheus.Gauge
175+
gv *prometheus.GaugeVec
176+
}
177+
178+
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
179+
return &Gauge{Gauge: g.gv.WithLabelValues(labelValues...)}
180+
}
181+
182+
type Histogram struct {
183+
prometheus.Histogram
184+
hv *prometheus.HistogramVec
185+
}
186+
187+
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
188+
return &Histogram{Histogram: h.hv.WithLabelValues(labelValues...).(prometheus.Histogram)}
189+
}
190+
191+
// Registry returns the prometheus registry.
192+
func (p *Provider) Registry() *prometheus.Registry {
193+
return p.registry
194+
}
195+
196+
func GetMetricValue(m prometheus.Metric) float64 {
197+
gm := promgo.Metric{}
198+
m.Write(&gm)
199+
200+
switch {
201+
case gm.Gauge != nil:
202+
return gm.Gauge.GetValue()
203+
case gm.Counter != nil:
204+
return gm.Counter.GetValue()
205+
case gm.Untyped != nil:
206+
return gm.Untyped.GetValue()
207+
case gm.Summary != nil:
208+
return gm.Summary.GetSampleSum()
209+
case gm.Histogram != nil:
210+
return gm.Histogram.GetSampleSum()
211+
default:
212+
panic("unsupported metric")
213+
}
214+
}

common/monitoring/server_util.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"net"
11+
"os/exec"
12+
"strconv"
13+
"strings"
14+
15+
"github.com/hyperledger/fabric-x-orderer/common/types"
16+
"github.com/pkg/errors"
17+
)
18+
19+
const protocol = "tcp"
20+
21+
type Endpoint struct {
22+
Host string
23+
Port int
24+
}
25+
26+
// Address returns a string representation of the endpoint's address.
27+
func (e *Endpoint) Address() string {
28+
return net.JoinHostPort(e.Host, strconv.Itoa(e.Port))
29+
}
30+
31+
type ServerConfig struct {
32+
endpoint *Endpoint
33+
preAllocatedListener net.Listener
34+
logger types.Logger
35+
}
36+
37+
// Listener instantiate a [net.Listener] and updates the config port with the effective port.
38+
func (s *ServerConfig) Listener() (net.Listener, error) {
39+
if s.preAllocatedListener != nil {
40+
return s.preAllocatedListener, nil
41+
}
42+
listener, err := net.Listen(protocol, s.endpoint.Address())
43+
if err != nil {
44+
return nil, errors.Wrap(err, "failed to listen")
45+
}
46+
47+
addr := listener.Addr()
48+
tcpAddress, ok := addr.(*net.TCPAddr)
49+
if !ok {
50+
return nil, errors.New(strings.Join([]string{"failed to cast to TCP address", listener.Close().Error()}, "; "))
51+
}
52+
s.endpoint.Port = tcpAddress.Port
53+
54+
s.logger.Infof("Listening on: %s://%s", protocol, s.endpoint.Address())
55+
return listener, nil
56+
}
57+
58+
// PreAllocateListener is used to allocate a port and bind to ahead of the server initialization.
59+
// It stores the listener object internally to be reused on subsequent calls to Listener().
60+
func (c *ServerConfig) PreAllocateListener() (net.Listener, error) {
61+
listener, err := c.Listener()
62+
if err != nil {
63+
return nil, err
64+
}
65+
c.preAllocatedListener = listener
66+
return listener, nil
67+
}
68+
69+
func FQDN() (string, error) {
70+
out, err := exec.Command("hostname", "--fqdn").Output()
71+
if err != nil {
72+
return "", err
73+
}
74+
return string(out), nil
75+
}

config/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (config *Configuration) ExtractRouterConfig(configBlock *common.Block) *nod
260260
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
261261
ClientSignatureVerificationRequired: config.LocalConfig.NodeLocalConfig.GeneralConfig.ClientSignatureVerificationRequired,
262262
Bundle: config.extractBundleFromConfigBlock(configBlock),
263-
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
263+
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
264264
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
265265
}
266266
return routerConfig
@@ -292,7 +292,7 @@ func (config *Configuration) ExtractBatcherConfig(configBlock *common.Block) *no
292292
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
293293
SubmitTimeout: config.LocalConfig.NodeLocalConfig.BatcherParams.SubmitTimeout,
294294
BatchSequenceGap: types.BatchSequence(config.LocalConfig.NodeLocalConfig.BatcherParams.BatchSequenceGap),
295-
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
295+
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
296296
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
297297
ClientSignatureVerificationRequired: config.LocalConfig.NodeLocalConfig.GeneralConfig.ClientSignatureVerificationRequired,
298298
Bundle: config.extractBundleFromConfigBlock(configBlock),
@@ -341,7 +341,7 @@ func (config *Configuration) ExtractConsenterConfig() *nodeconfig.ConsenterNodeC
341341
SigningPrivateKey: signingPrivateKey,
342342
WALDir: DefaultConsenterNodeConfigParams(config.LocalConfig.NodeLocalConfig.FileStore.Path).WALDir,
343343
BFTConfig: BFTConfig,
344-
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
344+
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
345345
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
346346
}
347347
return consenterConfig
@@ -376,7 +376,7 @@ func (config *Configuration) ExtractAssemblerConfig() *nodeconfig.AssemblerNodeC
376376
Consenter: consenterFromMyParty,
377377
UseTLS: config.LocalConfig.TLSConfig.Enabled,
378378
ClientAuthRequired: config.LocalConfig.TLSConfig.ClientAuthRequired,
379-
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
379+
MonitoringListenAddress: net.JoinHostPort(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress, strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort))),
380380
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
381381
}
382382
return assemblerConfig

0 commit comments

Comments
 (0)