Skip to content

Icinga 2 API as Event Source #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 65 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
9daa25e
Initial Event Stream API based Proof of Concept
julianbrost Sep 22, 2023
78d01e8
eventstream: represent of Icinga 2 API objects
oxzi Oct 13, 2023
c472f8d
eventstream: custom Icinga2Time to parse time
oxzi Oct 16, 2023
83933ce
eventstream: API response types for /v1/objects
oxzi Oct 16, 2023
9eefb19
eventstream: Initial Client
oxzi Oct 16, 2023
5c63cad
eventstream: handle AcknowledgementSet
oxzi Oct 17, 2023
814cc21
eventstream: Icinga Objects API support to Client
oxzi Oct 17, 2023
a85db4f
eventstream: reconnection and event replay logic
oxzi Oct 17, 2023
a21f35f
eventstream: Host/Service Events from CheckResult
oxzi Oct 18, 2023
aae7f87
eventstream: fix Severity, improve logging and doc
oxzi Oct 18, 2023
e9d19ca
eventstream: split client.go, refactor queryObjectsApi*
oxzi Oct 18, 2023
fca785c
eventstream: fetch Host/Service Groups from API
oxzi Oct 18, 2023
f1ac287
eventstream: replay Acknowledgements, refactoring
oxzi Oct 18, 2023
dd142f1
eventstream: test type assertions just to be sure
oxzi Oct 19, 2023
ab528b6
Initial Event Stream API integration
oxzi Oct 19, 2023
bac4095
Verify Icinga 2 API's CA certificate
oxzi Oct 19, 2023
f403eab
eventstream: HostServiceRuntimeAttributes real name field
oxzi Oct 19, 2023
aae0b41
eventstream: clean up mutex locks
oxzi Oct 19, 2023
e3dc33f
eventstream: test with testify library
oxzi Oct 23, 2023
9f0c828
eventstream: move config parsing out of main to package
oxzi Oct 23, 2023
e171689
eventstream: use filter_vars next to filters
oxzi Oct 23, 2023
ae6ce9c
eventstream: linear back off reconnection interval
oxzi Oct 23, 2023
ff6f267
eventstream: generic Attrs for ObjectQueriesResult
oxzi Oct 23, 2023
9178cb1
eventstream: refactor Event processing
oxzi Oct 25, 2023
5f5a974
eventstream: current time for new Events
oxzi Oct 26, 2023
0ae9fa1
eventstream: buffer Events during reconnection
oxzi Oct 26, 2023
46eb5b8
eventstream: Event dispatcher
oxzi Oct 26, 2023
494d008
eventstream: reconnection in Event Stream method
oxzi Oct 30, 2023
5c8a469
eventstream: only buffer Event Stream APIs
oxzi Oct 30, 2023
a5627b8
eventstream: unify errors, zap structured logging
oxzi Oct 30, 2023
06534e0
eventstream: remove CheckResul.Command
oxzi Oct 30, 2023
f5e6725
README.md: document Event Stream usage
oxzi Oct 30, 2023
7dd8c94
eventstream: rework Client's channels
oxzi Oct 31, 2023
8e404e0
eventstream: Event Stream connection timeout
oxzi Nov 2, 2023
3b93a15
eventstream: detect outdated events during replay
oxzi Nov 2, 2023
99ddf20
eventstream: Icinga 2 API query fixes
oxzi Nov 2, 2023
a0da999
eventstream: group replay goroutines in errgroup
oxzi Nov 2, 2023
baa205b
eventstream: Icinga 2-compatible TLS configuration
oxzi Nov 3, 2023
330aea0
eventstream: build Event URL more carefully
oxzi Nov 3, 2023
f3d9f08
eventstream: refactor Event Stream connection
oxzi Nov 3, 2023
a656dda
eventstream: context.Context as first parameter
oxzi Nov 3, 2023
0c05cde
eventstream: base replay cache on event name
oxzi Nov 3, 2023
3c4f946
eventstream: mimic PHP's rawurlencode function
oxzi Nov 3, 2023
fac71ef
eventstream: replay State and ACK Changes together
oxzi Nov 6, 2023
0ed4960
eventstream: rework replay communication logic
oxzi Nov 6, 2023
435b1ef
eventstream: dispatch through queue to callback
oxzi Nov 8, 2023
776d884
eventstream: ensure HTTP connection reusage
oxzi Nov 8, 2023
bd24d1c
eventstream: rename replay to catch-up phase
oxzi Nov 8, 2023
1a65d9a
eventstream: directly deliver events from worker
oxzi Nov 16, 2023
7890b3a
incident.ProcessEvent: refactor from common code
oxzi Dec 4, 2023
deaff4d
eventstream: hide internal Event Stream context
oxzi Nov 23, 2023
e0b4144
incident: custom superfluous state change error
oxzi Nov 29, 2023
e1452dd
eventstream: use unified logger name with field
oxzi Dec 1, 2023
be4901b
Create Event Stream Clients from Source
oxzi Dec 6, 2023
902cdd7
icinga2: rename eventstream package to icinga2
oxzi Jan 5, 2024
425a6a0
icinga2: rework Icinga2Time to UnixFloat
oxzi Jan 5, 2024
2f534b4
icinga2: consts for numeric Icinga 2 API results
oxzi Jan 8, 2024
86cc113
icinga2: only process HARD state changes
oxzi Jan 8, 2024
f66f726
icinga2: rename integer consts to Go-like names
oxzi Jan 8, 2024
d6c4d36
icinga2: restart catch-up-phase on error
oxzi Jan 11, 2024
24a4843
icinga2: custom certificate CN
oxzi Jan 17, 2024
00e4a8d
config: Fix {bool,string}Eq to compare Sources
oxzi Mar 19, 2024
aaae894
icinga2: Client fixes
oxzi Mar 19, 2024
786d287
icinga2: Rework catch-up-worker processing
oxzi Mar 21, 2024
4673b2c
icinga2: Custom http.Transport to set User-Agent
oxzi Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,17 @@ It is required that you have created a new database and imported the [schema](sc

Additionally, it also requires you to manually insert items into the **source** table before starting the daemon.
```sql
INSERT INTO source (id, type, name, listener_password_hash)
VALUES (1, 'icinga2', 'Icinga 2', '$2y$10$QU8bJ7cpW1SmoVQ/RndX5O2J5L1PJF7NZ2dlIW7Rv3zUEcbUFg3z2');
INSERT INTO source
(id, type, name, icinga2_base_url, icinga2_auth_user, icinga2_auth_pass, icinga2_insecure_tls)
VALUES
(1, 'icinga2', 'Local Icinga 2', 'https://localhost:5665', 'root', 'icinga', 'y');
```
The `listener_password_hash` is a [PHP `password_hash`](https://www.php.net/manual/en/function.password-hash.php) with the `PASSWORD_DEFAULT` algorithm, currently bcrypt.
In the example above, this is "correct horse battery staple".
This mimics Icinga Web 2's behavior, as stated in [its documentation](https://icinga.com/docs/icinga-web/latest/doc/20-Advanced-Topics/#manual-user-creation-for-database-authentication-backend).

Then, you can launch the daemon with the following command.
```go
go run ./cmd/icinga-notifications-daemon --config config.yml
```

Last but not least, in order for the daemon to receive events from Icinga 2, you need to copy the [Icinga 2 config](icinga2.conf)
to `/etc/icinga2/features-enabled` on your master node(s) and restart the Icinga 2 service. At the top of this file,
you will find multiple configurations options that can be set in `/etc/icinga2/constants.conf`. There are also Icinga2
`EventCommand` definitions in this file that will automatically match all your **checkables**, which may not work
properly if the configuration already uses event commands for something else.

## License

Icinga Notifications is licensed under the terms of the [GNU General Public License Version 2](LICENSE).
14 changes: 13 additions & 1 deletion cmd/icinga-notifications-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/icinga/icinga-notifications/internal/channel"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/daemon"
"github.com/icinga/icinga-notifications/internal/icinga2"
"github.com/icinga/icinga-notifications/internal/incident"
"github.com/icinga/icinga-notifications/internal/listener"
"github.com/icinga/icingadb/pkg/logging"
Expand Down Expand Up @@ -85,18 +86,29 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

runtimeConfig := config.NewRuntimeConfig(db, logs)
icinga2Launcher := &icinga2.Launcher{
Ctx: ctx,
Logs: logs,
Db: db,
RuntimeConfig: nil, // Will be set below as it is interconnected..
}

runtimeConfig := config.NewRuntimeConfig(icinga2Launcher.Launch, logs, db)
if err := runtimeConfig.UpdateFromDatabase(ctx); err != nil {
logger.Fatalw("failed to load config from database", zap.Error(err))
}

icinga2Launcher.RuntimeConfig = runtimeConfig

go runtimeConfig.PeriodicUpdates(ctx, 1*time.Second)

err = incident.LoadOpenIncidents(ctx, db, logs.GetChildLogger("incident"), runtimeConfig)
if err != nil {
logger.Fatalw("Can't load incidents from database", zap.Error(err))
}

// Wait to load open incidents from the database before either starting Event Stream Clients or starting the Listener.
icinga2Launcher.Ready()
if err := listener.NewListener(db, runtimeConfig, logs).Run(ctx); err != nil {
logger.Errorw("Listener has finished with an error", zap.Error(err))
} else {
Expand Down
25 changes: 22 additions & 3 deletions internal/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type RuntimeConfig struct {
// Accessing it requires a lock that is obtained with RLock() and released with RUnlock().
ConfigSet

// EventStreamLaunchFunc is a callback to launch an Event Stream API Client.
// This became necessary due to circular imports, either with the incident or icinga2 package.
EventStreamLaunchFunc func(source *Source)

// pending contains changes to config objects that are to be applied to the embedded live config.
pending ConfigSet

Expand All @@ -36,8 +40,18 @@ type RuntimeConfig struct {
mu sync.RWMutex
}

func NewRuntimeConfig(db *icingadb.DB, logs *logging.Logging) *RuntimeConfig {
return &RuntimeConfig{db: db, logs: logs, logger: logs.GetChildLogger("runtime-updates")}
func NewRuntimeConfig(
esLaunch func(source *Source),
logs *logging.Logging,
db *icingadb.DB,
) *RuntimeConfig {
return &RuntimeConfig{
EventStreamLaunchFunc: esLaunch,

logs: logs,
logger: logs.GetChildLogger("runtime-updates"),
db: db,
}
}

type ConfigSet struct {
Expand Down Expand Up @@ -167,9 +181,14 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg
return nil
}

if !source.ListenerPasswordHash.Valid {
logger.Debugw("Cannot check credentials for source without a listener_password_hash", zap.Int64("id", sourceId))
return nil
}

// If either PHP's PASSWORD_DEFAULT changes or Icinga Web 2 starts using something else, e.g., Argon2id, this will
// return a descriptive error as the identifier does no longer match the bcrypt "$2y$".
err = bcrypt.CompareHashAndPassword([]byte(source.ListenerPasswordHash), []byte(pass))
err = bcrypt.CompareHashAndPassword([]byte(source.ListenerPasswordHash.String), []byte(pass))
if errors.Is(err, bcrypt.ErrMismatchedHashAndPassword) {
logger.Debugw("Invalid password for this source", zap.Int64("id", sourceId))
return nil
Expand Down
83 changes: 71 additions & 12 deletions internal/config/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,56 @@ package config

import (
"context"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

// SourceTypeIcinga2 represents the "icinga2" Source Type for Event Stream API sources.
const SourceTypeIcinga2 = "icinga2"

// Source entry within the ConfigSet to describe a source.
type Source struct {
ID int64 `db:"id"`
Type string `db:"type"`
Name string `db:"name"`

ListenerPasswordHash string `db:"listener_password_hash"`
ListenerPasswordHash types.String `db:"listener_password_hash"`

Icinga2BaseURL types.String `db:"icinga2_base_url"`
Icinga2AuthUser types.String `db:"icinga2_auth_user"`
Icinga2AuthPass types.String `db:"icinga2_auth_pass"`
Icinga2CAPem types.String `db:"icinga2_ca_pem"`
Icinga2CommonName types.String `db:"icinga2_common_name"`
Icinga2InsecureTLS types.Bool `db:"icinga2_insecure_tls"`

// Icinga2SourceConf for Event Stream API sources, only if Source.Type == SourceTypeIcinga2.
Icinga2SourceCancel context.CancelFunc `db:"-" json:"-"`
}

// fieldEquals checks if this Source's database fields are equal to those of another Source.
func (source *Source) fieldEquals(other *Source) bool {
boolEq := func(a, b types.Bool) bool { return (!a.Valid && !b.Valid) || (a == b) }
stringEq := func(a, b types.String) bool { return (!a.Valid && !b.Valid) || (a == b) }

return source.ID == other.ID &&
source.Type == other.Type &&
source.Name == other.Name &&
stringEq(source.ListenerPasswordHash, other.ListenerPasswordHash) &&
stringEq(source.Icinga2BaseURL, other.Icinga2BaseURL) &&
stringEq(source.Icinga2AuthUser, other.Icinga2AuthUser) &&
stringEq(source.Icinga2AuthPass, other.Icinga2AuthPass) &&
stringEq(source.Icinga2CAPem, other.Icinga2CAPem) &&
stringEq(source.Icinga2CommonName, other.Icinga2CommonName) &&
boolEq(source.Icinga2InsecureTLS, other.Icinga2InsecureTLS)
}

// stop this Source's worker; currently only Icinga Event Stream API Client.
func (source *Source) stop() {
if source.Type == SourceTypeIcinga2 && source.Icinga2SourceCancel != nil {
source.Icinga2SourceCancel()
source.Icinga2SourceCancel = nil
}
}

func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error {
Expand All @@ -34,12 +73,12 @@ func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error {
zap.String("type", s.Type),
)
if sourcesById[s.ID] != nil {
sourceLogger.Warnw("ignoring duplicate config for source ID")
} else {
sourcesById[s.ID] = s

sourceLogger.Debugw("loaded source config")
sourceLogger.Error("Ignoring duplicate config for source ID")
continue
}

sourcesById[s.ID] = s
sourceLogger.Debug("loaded source config")
}

if r.Sources != nil {
Expand All @@ -62,16 +101,36 @@ func (r *RuntimeConfig) applyPendingSources() {
}

for id, pendingSource := range r.pending.Sources {
if pendingSource == nil {
r.logger.Infow("Source has been removed",
zap.Int64("id", r.Sources[id].ID),
zap.String("name", r.Sources[id].Name),
zap.String("type", r.Sources[id].Type))
logger := r.logger.With(zap.Int64("id", id))
currentSource := r.Sources[id]

// Compare the pending source with an optional existing source; instruct the Event Source Client, if necessary.
if pendingSource == nil && currentSource != nil {
logger.Info("Source has been removed")

currentSource.stop()
delete(r.Sources, id)
continue
} else if pendingSource != nil && currentSource != nil {
if currentSource.fieldEquals(pendingSource) {
continue
}

logger.Info("Source has been updated")
currentSource.stop()
} else if pendingSource != nil && currentSource == nil {
logger.Info("Source has been added")
} else {
r.Sources[id] = pendingSource
// Neither an active nor a pending source?
logger.Error("Cannot applying pending configuration: neither an active nor a pending source")
continue
}

if pendingSource.Type == SourceTypeIcinga2 {
r.EventStreamLaunchFunc(pendingSource)
}

r.Sources[id] = pendingSource
}

r.pending.Sources = nil
Expand Down
Loading
Loading