Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Input and Output deadlock when buffer is full during startup #1502

Merged
merged 3 commits into from
Mar 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ def lifecycle(desc: false, kind_callback: nil)
def start
lifecycle(desc: true) do |i| # instance
i.start unless i.started?
end
lifecycle(desc: true) do |i|
# Input#start sometimes emits lots of evetns with in_tail/`read_from_head true` case
# and it causes deadlock for small buffer/queue output. To avoid such problem,
# buffer related output threads should be run before `Input#start`.
# This is why after_start should be called immediately after start call.
# This depends on `desc: true` because calling plugin order of `desc: true` is
# Output, Filter, Label, Output with Router, then Input.
i.after_start unless i.after_started?
end
end
Expand Down
35 changes: 35 additions & 0 deletions test/test_plugin_classes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ def shutdown
end
end

class FluentTestGenInput < ::Fluent::Plugin::Input
::Fluent::Plugin.register_input('test_in_gen', self)

attr_reader :started

config_param :num, :integer, default: 10000

def start
super
@started = true

@num.times { |i|
router.emit("test.evet", Fluent::EventTime.now, {'message' => 'Hello!', 'key' => "value#{i}", 'num' => i})
}
end

def shutdown
@started = false
super
end
end

class FluentTestOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out', self)

Expand Down Expand Up @@ -112,6 +134,19 @@ def process(tag, es)

class FluentTestBufferedOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_buffered', self)

attr_reader :started

def start
super
@started = true
end

def shutdown
@started = false
super
end

def write(chunk)
# drop everything
end
Expand Down
55 changes: 41 additions & 14 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,15 @@ def configure_ra(conf_str)
end

sub_test_case 'start/shutdown' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
def setup_root_agent(conf)
ra = RootAgent.new(log: $log)
stub(Engine).root_agent { ra }
ra.configure(Config.parse(conf, "(test)", "(test_dir)", true))
ra
end

test 'plugin status' do
ra = setup_root_agent(<<-EOC)
<source>
@type test_in
@id test_in
Expand All @@ -191,19 +196,41 @@ def configure_ra(conf_str)
@id test_out
</match>
EOC
@ra
ra.start
assert_true ra.inputs.first.started
assert_true ra.filters.first.started
assert_true ra.outputs.first.started

ra.shutdown
assert_false ra.inputs.first.started
assert_false ra.filters.first.started
assert_false ra.outputs.first.started
end

test 'plugin status' do
@ra.start
assert_true @ra.inputs.first.started
assert_true @ra.filters.first.started
assert_true @ra.outputs.first.started
test 'output plugin threads should run before input plugin is blocked with buffer full' do
ra = setup_root_agent(<<-EOC)
<source>
@type test_in_gen
@id test_in_gen
</source>
<match **>
@type test_out_buffered
@id test_out_buffered
<buffer>
chunk_limit_size 1k
queue_limit_length 2
flush_thread_count 2
overflow_action block
</buffer>
</match>
EOC
waiting(5) { ra.start }
assert_true ra.inputs.first.started
assert_true ra.outputs.first.started

@ra.shutdown
assert_false @ra.inputs.first.started
assert_false @ra.filters.first.started
assert_false @ra.outputs.first.started
ra.shutdown
assert_false ra.inputs.first.started
assert_false ra.outputs.first.started
end
end

Expand Down