Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,17 @@ def verify_connection
end

def establish_connection(sock, ri)
start_time = Fluent::Clock.now
timeout = @sender.hard_timeout

while ri.state != :established
# Check for timeout to prevent infinite loop
if Fluent::Clock.now - start_time > timeout
@log.warn "handshake timeout after #{timeout}s", host: @host, port: @port
disable!
break
end

begin
# TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly.
# We need rewrite around here using new socket/server plugin helper.
Expand Down
23 changes: 23 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1406,4 +1406,27 @@ def plugin_id_for_test?
assert_equal 0, @d.instance.healthy_nodes_count
assert_equal 0, @d.instance.registered_nodes_count
end

test 'establish_connection_timeout' do
@d = d = create_driver(%[
hard_timeout 1
<server>
host #{TARGET_HOST}
port #{@target_port}
</server>
])

node = d.instance.nodes.first
mock_sock = flexmock('socket')
mock_sock.should_receive(:read_nonblock).with(512).and_return('').at_least.once

ri = Fluent::Plugin::ForwardOutput::ConnectionManager::RequestInfo.new(:helo)

assert_true node.available?
node.establish_connection(mock_sock, ri)
assert_false node.available?

logs = d.logs
assert{ logs.any?{|log| log.include?('handshake timeout after 1.0s') } }
end
end