Skip to content

Commit 73ca584

Browse files
committed
added support for multiple handlers
1 parent faef5cd commit 73ca584

6 files changed

+167
-71
lines changed

CHANGELOG.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
# Changelog
22

3+
## [2.1.0] - 2017-05-15
4+
### Added
5+
- Support multiple handlers (@terjesannum)
6+
37
## [2.0.1] - 2017-05-14
48
### Added
5-
- Released as Gem
9+
- Released as Gem (@terjesannum)
610

711
## [1.7] - 2017-04-24
812
### Added

README.md

+29
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,35 @@ measurement = app.downloads, tags = platform => iOS;device => iPad , value = 92,
175175

176176
The event output tags will be merged with client and check definition tags and sent to InfluxDB as usual.
177177

178+
# Multiple handlers
179+
180+
If you need to have multiple handlers, eg. for different precision, proxy mode, writing to different influx databases etc, this can be done by configuring **additional_handlers**:
181+
182+
```
183+
{
184+
"influxdb-extension": {
185+
"hostname": "influxdb",
186+
"port": 8086,
187+
"database": "metrics",
188+
"username": "sensu",
189+
"password": "sensu",
190+
"buffer_size": 1000,
191+
"buffer_max_age": 10,
192+
"additional_handlers": ["events", "events_nano"]
193+
},
194+
"events": {
195+
"proxy_mode": true,
196+
"precision": "s"
197+
},
198+
"events_nano": {
199+
"proxy_mode": true,
200+
"precision": "n"
201+
}
202+
}
203+
```
204+
205+
Settings for the additional handlers will be merged with the **influxdb-extension** settings, so you only need to specify the settings you want to change for that handler.
206+
178207
# Performance
179208

180209
The extension will buffer up points until it reaches the configured **buffer_size** length or **buffer_max_age**, and then post all the points in the buffer to InfluxDB.

influxdb-extension.json.tmpl

+6-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@
1010
"password": "<influxdb password>",
1111
"buffer_size": 100,
1212
"buffer_max_age": 10,
13-
"precision": "s"
13+
"precision": "s",
14+
"additional_handlers": ["influxdb-proxy-nano"]
15+
},
16+
"influxdb-proxy-nano": {
17+
"proxy_mode": true,
18+
"precision": "n"
1419
}
1520
}
1621

lib/sensu/extensions/influxdb.rb

+95-51
Original file line numberDiff line numberDiff line change
@@ -18,60 +18,104 @@ def description
1818
"Transforms and sends metrics to InfluxDB"
1919
end
2020

21-
def post_init
22-
influxdb_config = settings[@@extension_name] || Hash.new
23-
validate_config(influxdb_config)
24-
25-
hostname = influxdb_config[:hostname] || "127.0.0.1"
26-
port = influxdb_config[:port] || "8086"
27-
database = influxdb_config[:database]
28-
ssl = influxdb_config[:ssl] || false
29-
ssl_ca_file = influxdb_config[:ssl_ca_file]
30-
ssl_verify = if influxdb_config.key?(:ssl_verify) then influxdb_config[:ssl_verify] else true end
31-
precision = influxdb_config[:precision] || "s"
32-
retention_policy = influxdb_config[:retention_policy]
21+
@@default_config = {
22+
:hostname => "127.0.0.1",
23+
:port => "8086",
24+
:ssl => false,
25+
:precision => "s",
26+
:protocol => "http",
27+
:buffer_size => 100,
28+
:buffer_max_age => 10,
29+
:proxy_mode => false
30+
}
31+
32+
def create_config(name, defaults)
33+
if settings[name].nil?
34+
Raise ArgumentError "no configuration for #{name} provided. exiting..."
35+
end
36+
config = defaults.merge(settings[name])
37+
@logger.debug("Config for #{name} created: #{config}")
38+
validate_config(name, config)
39+
40+
hostname = config[:hostname]
41+
port = config[:port]
42+
database = config[:database]
43+
ssl = config[:ssl]
44+
ssl_ca_file = config[:ssl_ca_file]
45+
ssl_verify = if config.key?(:ssl_verify) then config[:ssl_verify] else true end
46+
precision = config[:precision]
47+
retention_policy = config[:retention_policy]
3348
rp_queryparam = if retention_policy.nil? then "" else "&rp=#{retention_policy}" end
3449
protocol = if ssl then "https" else "http" end
35-
username = influxdb_config[:username]
36-
password = influxdb_config[:password]
50+
username = config[:username]
51+
password = config[:password]
3752
auth_queryparam = if username.nil? or password.nil? then "" else "&u=#{username}&p=#{password}" end
38-
@BUFFER_SIZE = if influxdb_config.key?(:buffer_size) then influxdb_config[:buffer_size].to_i else 100 end
39-
@BUFFER_MAX_AGE = if influxdb_config.key?(:buffer_max_age) then influxdb_config[:buffer_max_age].to_i else 10 end
40-
@PROXY_MODE = influxdb_config[:proxy_mode] || false
53+
buffer_size = config[:buffer_size]
54+
buffer_max_age = config[:buffer_max_age]
55+
proxy_mode = config[:proxy_mode]
4156

4257
string = "#{protocol}://#{hostname}:#{port}/write?db=#{database}&precision=#{precision}#{rp_queryparam}#{auth_queryparam}"
43-
@uri = URI(string)
44-
@http = Net::HTTP::new(@uri.host, @uri.port)
58+
uri = URI(string)
59+
http = Net::HTTP::new(uri.host, uri.port)
4560
if ssl
46-
@http.ssl_version = :TLSv1
47-
@http.use_ssl = true
48-
@http.verify_mode = if ssl_verify then OpenSSL::SSL::VERIFY_PEER else OpenSSL::SSL::VERIFY_NONE end
49-
@http.ca_file = ssl_ca_file
61+
http.ssl_version = :TLSv1
62+
http.use_ssl = true
63+
http.verify_mode = if ssl_verify then OpenSSL::SSL::VERIFY_PEER else OpenSSL::SSL::VERIFY_NONE end
64+
http.ca_file = ssl_ca_file
5065
end
51-
@buffer = []
52-
@buffer_flushed = Time.now.to_i
5366

54-
@logger.info("#{@@extension_name}: successfully initialized config: hostname: #{hostname}, port: #{port}, database: #{database}, uri: #{@uri.to_s}, username: #{username}, buffer_size: #{@BUFFER_SIZE}, buffer_max_age: #{@BUFFER_MAX_AGE}")
67+
@handlers ||= Hash.new
68+
@handlers[name] = {
69+
"http" => http,
70+
"uri" => uri,
71+
"buffer" => [],
72+
"buffer_flushed" => Time.now.to_i,
73+
"buffer_size" => buffer_size,
74+
"buffer_max_age" => buffer_max_age,
75+
"proxy_mode" => proxy_mode
76+
}
77+
78+
@logger.info("#{name}: successfully initialized handler: hostname: #{hostname}, port: #{port}, database: #{database}, uri: #{uri.to_s}, username: #{username}, buffer_size: #{buffer_size}, buffer_max_age: #{buffer_max_age}")
79+
return config
80+
end
81+
82+
def post_init
83+
main_config = create_config(@@extension_name, @@default_config)
84+
if settings[name].key?(:additional_handlers)
85+
settings[name][:additional_handlers].each {|h| create_config(h, main_config)}
86+
end
5587
end
5688

5789
def run(event)
5890
begin
91+
@logger.debug("event: #{event}")
92+
event = JSON.parse(event)
5993

60-
if buffer_too_old? or buffer_too_big?
61-
flush_buffer
94+
handler = @handlers[@@extension_name]
95+
unless event["check"]["handlers"].nil?
96+
event["check"]["handlers"].each {|x|
97+
if @handlers.has_key?(x)
98+
@logger.debug("found additional handler: #{x}")
99+
handler = @handlers[x]
100+
break
101+
end
102+
}
103+
end
104+
105+
if buffer_too_old?(handler) or buffer_too_big?(handler)
106+
flush_buffer(handler)
62107
end
63108

64-
event = JSON.parse(event)
65109
output = event["check"]["output"]
66110

67-
if not @PROXY_MODE
111+
if not handler["proxy_mode"]
68112
client_tags = event["client"]["tags"] || Hash.new
69113
check_tags = event["check"]["tags"] || Hash.new
70114
tags = create_tags(client_tags.merge(check_tags))
71115
end
72116

73117
output.split(/\r\n|\n/).each do |point|
74-
if not @PROXY_MODE
118+
if not handler["proxy_mode"]
75119
measurement, field_value, timestamp = point.split(/\s+/)
76120

77121
if not is_number?(timestamp)
@@ -93,8 +137,8 @@ def run(event)
93137
point = "#{measurement}#{tags} value=#{field_value} #{timestamp}"
94138
end
95139

96-
@buffer.push(point)
97-
@logger.debug("#{@@extension_name}: stored point in buffer (#{@buffer.length}/#{@BUFFER_SIZE})")
140+
handler["buffer"].push(point)
141+
@logger.debug("#{@@extension_name}: stored point in buffer (#{handler['buffer'].length}/#{handler['buffer_size']})")
98142
end
99143
yield 'ok', 0
100144
rescue => e
@@ -123,39 +167,39 @@ def create_tags(tags)
123167
end
124168
end
125169

126-
def send_to_influxdb(payload)
127-
request = Net::HTTP::Post.new(@uri.request_uri)
170+
def send_to_influxdb(handler)
171+
payload = handler["buffer"].join("\n")
172+
request = Net::HTTP::Post.new(handler['uri'].request_uri)
128173
request.body = payload
129174

130-
@logger.debug("#{@@extension_name}: writing payload #{payload} to endpoint #{@uri.to_s}")
131-
response = @http.request(request)
175+
@logger.debug("#{@@extension_name}: writing payload #{payload} to endpoint #{handler['uri'].to_s}")
176+
response = handler["http"].request(request)
132177
@logger.debug("#{@@extension_name}: influxdb http response code = #{response.code}, body = #{response.body}")
133178
end
134179

135-
def flush_buffer
136-
payload = @buffer.join("\n")
137-
send_to_influxdb(payload)
138-
@buffer = []
139-
@buffer_flushed = Time.now.to_i
180+
def flush_buffer(handler)
181+
send_to_influxdb(handler)
182+
handler["buffer"] = []
183+
handler["buffer_flushed"] = Time.now.to_i
140184
end
141185

142-
def buffer_too_old?
143-
buffer_age = Time.now.to_i - @buffer_flushed
144-
buffer_age >= @BUFFER_MAX_AGE
186+
def buffer_too_old?(handler)
187+
buffer_age = Time.now.to_i - handler["buffer_flushed"]
188+
buffer_age >= handler["buffer_max_age"]
145189
end
146190

147-
def buffer_too_big?
148-
@buffer.length >= @BUFFER_SIZE
191+
def buffer_too_big?(handler)
192+
handler["buffer"].length >= handler["buffer_size"]
149193
end
150194

151-
def validate_config(config)
195+
def validate_config(name, config)
152196
if config.nil?
153-
raise ArgumentError, "no configuration for #{@@extension_name} provided. exiting..."
197+
raise ArgumentError, "no configuration for #{name} provided. exiting..."
154198
end
155199

156200
["hostname", "database"].each do |required_setting|
157-
if config[required_setting].nil?
158-
raise ArgumentError, "required setting #{required_setting} not provided to extension. this should be provided as json element with key #{@@extension_name}. exiting..."
201+
if config.has_key?(required_setting)
202+
raise ArgumentError, "required setting #{required_setting} not provided to extension. this should be provided as json element with key #{name}. exiting..."
159203
end
160204
end
161205
end

sensu-extensions-influxdb.gemspec

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
Gem::Specification.new do |spec|
44
spec.name = "sensu-extensions-influxdb"
5-
spec.version = "2.0.1"
5+
spec.version = "2.1.0"
6+
spec.license = "MIT"
67
spec.authors = ["Johnny Horvi", "Terje Sannum"]
78
89

spec/influxdb-extension_spec.rb

+30-17
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
require "sensu/extensions/influxdb"
2+
require "sensu/logger"
23

34
describe "Sensu::Extension::InfluxDB" do
45

56
before do
67
@extension = Sensu::Extension::InfluxDB.new
7-
@extension.settings = {
8-
"influxdb-extension" => {
9-
"database" => "test",
10-
"hostname" => "127.0.0.69"
11-
}
8+
@extension.settings = Hash.new
9+
@extension.settings["influxdb-extension"] = {
10+
:database => "test",
11+
:hostname => "nonexistinghost",
12+
:additional_handlers => ["proxy"],
13+
:buffer_size => 5,
14+
:buffer_max_age => 1
15+
}
16+
@extension.settings["proxy"] = {
17+
:proxy_mode => true
1218
}
1319

14-
stubbed_logger = double("logger", :info => "info", :debug => "debug", :error => "error")
15-
16-
@extension.instance_variable_set("@logger", stubbed_logger)
20+
@extension.instance_variable_set("@logger", Sensu::Logger.get(:log_level => :fatal))
1721
@extension.post_init
1822
end
1923

2024
it "processes minimal event" do
2125
@extension.run(minimal_event.to_json) do
22-
buffer = @extension.instance_variable_get("@buffer")
26+
buffer = @extension.instance_variable_get("@handlers")["influxdb-extension"]["buffer"]
2327
expect(buffer[0]).to eq("rspec value=69 1480697845")
2428
end
2529
end
@@ -36,14 +40,13 @@
3640
}
3741

3842
@extension.run(event.to_json) do
39-
buffer = @extension.instance_variable_get("@buffer")
43+
buffer = @extension.instance_variable_get("@handlers")["influxdb-extension"]["buffer"]
4044
expect(buffer.size).to eq(0)
4145
end
4246
end
4347

4448

4549
it "flushes buffer when full" do
46-
@extension.instance_variable_set("@BUFFER_SIZE", 5)
4750
5.times {
4851
@extension.run(minimal_event.to_json) do |output,status|
4952
expect(output).to eq("ok")
@@ -60,12 +63,11 @@
6063
end
6164

6265
it "flushes buffer when timed out" do
63-
@extension.instance_variable_set("@BUFFER_MAX_AGE", 1)
6466
@extension.run(minimal_event.to_json) do end
6567
sleep(1)
6668
@extension.run(minimal_event.to_json) do end
6769

68-
buffer = @extension.instance_variable_get("@buffer")
70+
buffer = @extension.instance_variable_get("@handlers")["influxdb-extension"]["buffer"]
6971
expect(buffer.size).to eq(1)
7072
end
7173

@@ -91,15 +93,14 @@
9193

9294
@extension.run(event.to_json) do end
9395

94-
buffer = @extension.instance_variable_get("@buffer")
96+
buffer = @extension.instance_variable_get("@handlers")["influxdb-extension"]["buffer"]
9597
expect(buffer[0]).to eq("rspec,a=1,b=1,c=1,x=1,y=1,z=1 value=69 1480697845")
9698
end
9799

98100
it "does not modify input in proxy mode" do
99-
@extension.instance_variable_set("@PROXY_MODE", true)
100-
@extension.run(minimal_event.to_json) do end
101+
@extension.run(minimal_event_proxy.to_json) do end
101102

102-
buffer = @extension.instance_variable_get("@buffer")
103+
buffer = @extension.instance_variable_get("@handlers")["proxy"]["buffer"]
103104
expect(buffer[0]).to eq("rspec 69 1480697845")
104105
end
105106

@@ -115,3 +116,15 @@ def minimal_event
115116
}
116117
}
117118
end
119+
120+
def minimal_event_proxy
121+
event = {
122+
"client" => {
123+
"name" => "rspec"
124+
},
125+
"check" => {
126+
"handlers" => ["proxy"],
127+
"output" => "rspec 69 1480697845"
128+
}
129+
}
130+
end

0 commit comments

Comments
 (0)