Skip to content

Commit 1e78609

Browse files
authored
Target Allocator Support for Telegraf Based Prometheus Receiver (#1394)
1 parent b979421 commit 1e78609

File tree

7 files changed

+386
-34
lines changed

7 files changed

+386
-34
lines changed

plugins/inputs/prometheus/metrics_receiver_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44
package prometheus
55

66
import (
7+
"os"
8+
"path/filepath"
79
"testing"
810

11+
kitlog "github.com/go-kit/log"
12+
"github.com/prometheus/common/promlog"
13+
"github.com/prometheus/prometheus/config"
914
"github.com/prometheus/prometheus/model/labels"
1015
"github.com/prometheus/prometheus/storage"
1116
"github.com/stretchr/testify/assert"
@@ -110,3 +115,57 @@ func Test_metricAppender_Commit(t *testing.T) {
110115
}
111116
assert.Equal(t, expected, *pmb[0])
112117
}
118+
119+
func Test_loadConfigFromFileWithTargetAllocator(t *testing.T) {
120+
os.Setenv("POD_NAME", "collector-1")
121+
defer os.Unsetenv("POD_NAME")
122+
configFile := filepath.Join("testdata", "target_allocator.yaml")
123+
logger := kitlog.NewLogfmtLogger(os.Stdout)
124+
logLevel := promlog.AllowedLevel{}
125+
logLevel.Set("DEBUG")
126+
var reloadHandler = func(cfg *config.Config) error {
127+
logger.Log("reloaded")
128+
return nil
129+
}
130+
taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
131+
err := reloadConfig(configFile, logger, taManager, reloadHandler)
132+
assert.NoError(t, err)
133+
assert.True(t, taManager.enabled)
134+
assert.Equal(t, taManager.config.TargetAllocator.CollectorID, "collector-1")
135+
assert.Equal(t, taManager.config.TargetAllocator.TLSSetting.CAFile, DEFAULT_TLS_CA_FILE_PATH)
136+
137+
}
138+
139+
func Test_loadConfigFromFileWithoutTargetAllocator(t *testing.T) {
140+
os.Setenv("POD_NAME", "collector-1")
141+
defer os.Unsetenv("POD_NAME")
142+
configFile := filepath.Join("testdata", "base-k8.yaml")
143+
logLevel := promlog.AllowedLevel{}
144+
logLevel.Set("DEBUG")
145+
logger := kitlog.NewLogfmtLogger(os.Stdout)
146+
var reloadHandler = func(cfg *config.Config) error {
147+
logger.Log("reloaded")
148+
return nil
149+
}
150+
taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
151+
err := reloadConfig(configFile, logger, taManager, reloadHandler)
152+
assert.NoError(t, err)
153+
assert.False(t, taManager.enabled)
154+
155+
}
156+
func Test_loadConfigFromFileEC2(t *testing.T) {
157+
configFile := filepath.Join("testdata", "base-k8.yaml")
158+
logger := kitlog.NewLogfmtLogger(os.Stdout)
159+
logLevel := promlog.AllowedLevel{}
160+
logLevel.Set("DEBUG")
161+
var reloadHandler = func(cfg *config.Config) error {
162+
logger.Log("reloaded")
163+
return nil
164+
}
165+
166+
taManager := createTargetAllocatorManager(configFile, logger, &logLevel, nil, nil)
167+
err := reloadConfig(configFile, logger, taManager, reloadHandler)
168+
assert.NoError(t, err)
169+
assert.False(t, taManager.enabled)
170+
171+
}

plugins/inputs/prometheus/start.go

+70-34
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package prometheus
2121

2222
import (
2323
"context"
24+
"fmt"
2425
"os"
2526
"os/signal"
2627
"runtime"
@@ -101,8 +102,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
101102
cfg.configFile = configFilePath
102103

103104
logger := promlog.New(&cfg.promlogConfig)
104-
//stdlog.SetOutput(log.NewStdlibAdapter(logger))
105-
//stdlog.Println("redirect std log")
106105

107106
klog.SetLogger(klogr.New().WithName("k8s_client_runtime").V(6))
108107

@@ -116,8 +115,19 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
116115
ctxScrape, cancelScrape = context.WithCancel(context.Background())
117116
sdMetrics, _ = discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
118117
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
119-
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
118+
119+
scrapeManager, _ = scrape.NewManager(&scrape.Options{}, log.With(logger, "component", "scrape manager"), receiver, prometheus.DefaultRegisterer)
120+
taManager = createTargetAllocatorManager(configFilePath, log.With(logger, "component", "ta manager"), logLevel, scrapeManager, discoveryManagerScrape)
121+
)
122+
123+
level.Info(logger).Log("msg", fmt.Sprintf("Target Allocator is %t", taManager.enabled))
124+
//Setup Target Allocator Scrape Post Process Handler
125+
taManager.AttachReloadConfigHandler(
126+
func(prometheusConfig *config.Config) {
127+
relabelScrapeConfigs(prometheusConfig, logger)
128+
},
120129
)
130+
121131
mth.SetScrapeManager(scrapeManager)
122132

123133
var reloaders = []func(cfg *config.Config) error{
@@ -151,7 +161,6 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
151161
close(reloadReady.C)
152162
})
153163
}
154-
155164
var g run.Group
156165
{
157166
// Termination handler.
@@ -179,12 +188,13 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
179188
// Scrape discovery manager.
180189
g.Add(
181190
func() error {
191+
level.Info(logger).Log("msg", "Scrape discovery manager starting")
182192
err := discoveryManagerScrape.Run()
183-
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
193+
level.Info(logger).Log("msg", "Scrape discovery manager stopped", "error", err)
184194
return err
185195
},
186196
func(err error) {
187-
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
197+
level.Info(logger).Log("msg", "Stopping scrape discovery manager...", "error", err)
188198
cancelScrape()
189199
},
190200
)
@@ -201,17 +211,35 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
201211

202212
level.Info(logger).Log("msg", "start discovery")
203213
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
204-
level.Info(logger).Log("msg", "Scrape manager stopped")
214+
level.Info(logger).Log("msg", "Scrape manager stopped", "error", err)
205215
return err
206216
},
207217
func(err error) {
208218
// Scrape manager needs to be stopped before closing the local TSDB
209219
// so that it doesn't try to write samples to a closed storage.
210-
level.Info(logger).Log("msg", "Stopping scrape manager...")
220+
level.Info(logger).Log("msg", "Stopping scrape manager...", "error", err)
211221
scrapeManager.Stop()
212222
},
213223
)
214224
}
225+
{
226+
// Target Allocator manager.
227+
if taManager.enabled {
228+
g.Add(
229+
func() error {
230+
// we wait until the config is fully loaded.
231+
level.Info(logger).Log("msg", "start ta manager")
232+
err := taManager.Run()
233+
level.Info(logger).Log("msg", "ta manager stopped", "error", err)
234+
return err
235+
},
236+
func(err error) {
237+
level.Info(logger).Log("msg", "Stopping ta manager...", "error", err)
238+
taManager.Shutdown()
239+
},
240+
)
241+
}
242+
}
215243
{
216244
// Reload handler.
217245

@@ -227,7 +255,7 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
227255
for {
228256
select {
229257
case <-hup:
230-
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
258+
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
231259
level.Error(logger).Log("msg", "Error reloading config", "err", err)
232260
}
233261

@@ -257,9 +285,11 @@ func Start(configFilePath string, receiver storage.Appendable, shutDownChan chan
257285

258286
default:
259287
}
260-
288+
if taManager.enabled {
289+
<-taManager.taReadyCh
290+
}
261291
level.Info(logger).Log("msg", "handling config file")
262-
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
292+
if err := reloadConfig(cfg.configFile, logger, taManager, reloaders...); err != nil {
263293
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
264294
}
265295
level.Info(logger).Log("msg", "finish handling config file")
@@ -288,30 +318,11 @@ const (
288318
savedScrapeNameLabel = "cwagent_saved_scrape_name" // just arbitrary name that end user won't override in relabel config
289319
)
290320

291-
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
292-
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
293-
content, _ := os.ReadFile(filename)
294-
text := string(content)
295-
level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text)
296-
297-
defer func() {
298-
if err == nil {
299-
configSuccess.Set(1)
300-
configSuccessTime.SetToCurrentTime()
301-
} else {
302-
configSuccess.Set(0)
303-
}
304-
}()
305-
306-
conf, err := config.LoadFile(filename, false, false, logger)
307-
if err != nil {
308-
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
309-
}
310-
321+
func relabelScrapeConfigs(prometheusConfig *config.Config, logger log.Logger) {
311322
// For saving name before relabel
312323
// - __name__ https://github.com/aws/amazon-cloudwatch-agent/issues/190
313324
// - job and instance https://github.com/aws/amazon-cloudwatch-agent/issues/193
314-
for _, sc := range conf.ScrapeConfigs {
325+
for _, sc := range prometheusConfig.ScrapeConfigs {
315326
relabelConfigs := []*relabel.Config{
316327
// job
317328
{
@@ -331,13 +342,38 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
331342
},
332343
}
333344

334-
level.Info(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel")
345+
level.Debug(logger).Log("msg", "Add extra relabel_configs and metric_relabel_configs to save job, instance and __name__ before user relabel")
335346

336347
sc.RelabelConfigs = append(relabelConfigs, sc.RelabelConfigs...)
337348
sc.MetricRelabelConfigs = append(metricNameRelabelConfigs, sc.MetricRelabelConfigs...)
338-
339349
}
350+
}
351+
func reloadConfig(filename string, logger log.Logger, taManager *TargetAllocatorManager, rls ...func(*config.Config) error) (err error) {
352+
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
353+
content, _ := os.ReadFile(filename)
354+
text := string(content)
355+
level.Debug(logger).Log("msg", "Prometheus configuration file", "value", text)
340356

357+
defer func() {
358+
if err == nil {
359+
configSuccess.Set(1)
360+
configSuccessTime.SetToCurrentTime()
361+
} else {
362+
configSuccess.Set(0)
363+
}
364+
}()
365+
// Check for TA
366+
var conf *config.Config
367+
if taManager.enabled {
368+
level.Info(logger).Log("msg", "Target Allocator is enabled")
369+
conf = (*config.Config)(taManager.config.PrometheusConfig)
370+
} else {
371+
conf, err = config.LoadFile(filename, false, false, logger)
372+
if err != nil {
373+
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
374+
}
375+
}
376+
relabelScrapeConfigs(conf, logger)
341377
failed := false
342378
for _, rl := range rls {
343379
if err := rl(conf); err != nil {

0 commit comments

Comments
 (0)