11require 'fluent/output'
2+ require 'net/https'
23
34module Fluent
45 class LogtailOutput < Fluent ::BufferedOutput
56 Fluent ::Plugin . register_output ( 'logtail' , self )
67
7- VERSION = "0.1.0 " . freeze
8+ VERSION = "0.1.1 " . freeze
89 CONTENT_TYPE = "application/msgpack" . freeze
9- HOST = "https://in.logtail.com" . freeze
10+ HOST = "in.logtail.com" . freeze
11+ PORT = 443
1012 PATH = "/" . freeze
1113 MAX_ATTEMPTS = 3 . freeze
1214 RETRYABLE_CODES = [ 429 , 500 , 502 , 503 , 504 ] . freeze
13- USER_AGENT = "Logtail Logstash /#{ VERSION } " . freeze
15+ USER_AGENT = "Logtail Fluentd /#{ VERSION } " . freeze
1416
1517 config_param :source_token , :string , secret : true
1618 config_param :ip , :string , default : nil
1719
1820 def configure ( conf )
19- source_token = conf [ "source_token" ]
20- @headers = {
21- "Authorization" => "Bearer #{ source_token } " ,
22- "Content-Type" => CONTENT_TYPE ,
23- "User-Agent" => USER_AGENT
24- }
25- super
26- end
27-
28- def start
29- super
30- require 'http'
31- HTTP . default_options = { :keep_alive_timeout => 29 }
32- @http_client = HTTP . persistent ( HOST )
33- end
34-
35- def shutdown
36- @http_client . close if @http_client
21+ @source_token = conf [ "source_token" ]
3722 super
3823 end
3924
4025 def format ( tag , time , record )
41- record . merge ( "dt" => Time . at ( time ) . utc . iso8601 ) . to_msgpack
26+ force_utf8_string_values ( record . merge ( "dt" => Time . at ( time ) . utc . iso8601 ) ) . to_msgpack
4227 end
4328
4429 def write ( chunk )
@@ -52,11 +37,19 @@ def deliver(chunk, attempt)
5237 return false
5338 end
5439
40+ http = build_http_client
5541 body = chunk . read
56- response = @http_client . headers ( @headers ) . post ( PATH , body : body )
57- response . flush
58- code = response . code
5942
43+ begin
44+ resp = http . start do |conn |
45+ req = build_request ( body )
46+ conn . request ( req )
47+ end
48+ ensure
49+ http . finish if http . started?
50+ end
51+
52+ code = resp . code . to_i
6053 if code >= 200 && code <= 299
6154 true
6255 elsif RETRYABLE_CODES . include? ( code )
@@ -76,5 +69,38 @@ def sleep_for_attempt(attempt)
7669 sleep_for = sleep_for <= 60 ? sleep_for : 60
7770 ( sleep_for / 2 ) + ( rand ( 0 ..sleep_for ) / 2 )
7871 end
72+
73+ def force_utf8_string_values ( data )
74+ data . transform_values do |val |
75+ if val . is_a? ( Hash )
76+ force_utf8_string_values ( val )
77+ elsif val . respond_to? ( :force_encoding )
78+ val . force_encoding ( 'UTF-8' )
79+ else
80+ val
81+ end
82+ end
83+ end
84+
85+ def build_http_client
86+ http = Net ::HTTP . new ( HOST , PORT )
87+ http . use_ssl = true
88+ # Verification on Windows fails despite having a valid certificate.
89+ http . verify_mode = OpenSSL ::SSL ::VERIFY_NONE
90+ http . read_timeout = 30
91+ http . ssl_timeout = 10
92+ http . open_timeout = 10
93+ http
94+ end
95+
96+ def build_request ( body )
97+ path = '/'
98+ req = Net ::HTTP ::Post . new ( path )
99+ req [ "Authorization" ] = "Bearer #{ @source_token } "
100+ req [ "Content-Type" ] = CONTENT_TYPE
101+ req [ "User-Agent" ] = USER_AGENT
102+ req . body = body
103+ req
104+ end
79105 end
80106end
0 commit comments