Skip to content

Commit

Permalink
Merge pull request #1242 from fluent/lifecycle-method-called-correctl…
Browse files Browse the repository at this point in the history
…y-once

Lifecycle method called correctly once
  • Loading branch information
tagomoris authored Sep 28, 2016
2 parents 09962f2 + d93bd1e commit 463ac14
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 16 deletions.
20 changes: 8 additions & 12 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,12 @@ def lifecycle_control_list
lifecycle_control_list[:input] << i
end
end
recursive_output_traverse = ->(o) {
outputs.each do |o|
if o.has_router?
lifecycle_control_list[:output_with_router] << o
else
lifecycle_control_list[:output] << o
end

if o.respond_to?(:outputs)
o.outputs.each do |store|
recursive_output_traverse.call(store)
end
end
}
outputs.each do |o|
recursive_output_traverse.call(o)
end
filters.each do |f|
lifecycle_control_list[:filter] << f
Expand Down Expand Up @@ -135,9 +126,14 @@ def add_match(type, pattern, conf)
output.router = @event_router if output.respond_to?(:router=)
output.configure(conf)
@outputs << output
if output.respond_to?(:outputs) && (output.respond_to?(:multi_output?) && output.multi_output? || output.is_a?(Fluent::MultiOutput))
if output.respond_to?(:outputs) && output.respond_to?(:multi_output?) && output.multi_output?
# TODO: ruby 2.3 or later: replace `output.respond_to?(:multi_output?) && output.multi_output?` with output&.multi_output?
@outputs.push(*output.outputs)
outputs = if output.respond_to?(:static_outputs)
output.static_outputs
else
output.outputs
end
@outputs.push(*outputs)
end
@event_router.add_rule(pattern, output)

Expand Down
66 changes: 63 additions & 3 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class MultiOutput < Base
config_param :@type, :string, default: nil
end

attr_reader :outputs
attr_reader :outputs, :outputs_statically_created

def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
Expand All @@ -41,8 +41,7 @@ def process(tag, es)
def initialize
super
@outputs = []

@compat = false
@outputs_statically_created = false

@counters_monitor = Monitor.new
# TODO: well organized counters
Expand Down Expand Up @@ -78,11 +77,72 @@ def configure(conf)
end
end

def static_outputs
@outputs_statically_created = true
@outputs
end

# Child plugin's lifecycles are controlled by agent automatically.
# It calls `outputs` to traverse plugins, and invoke start/stop/*shutdown/close/terminate on these directly.
# * `start` of this plugin will be called after child plugins
# * `stop`, `*shutdown`, `close` and `terminate` of this plugin will be called before child plugins

# But when MultiOutput plugins are created dynamically (by forest plugin or others), agent cannot find
# sub-plugins. So child plugins' lifecycles MUST be controlled by MultiOutput plugin itself.
# TODO: this hack will be removed at v2.
def call_lifecycle_method(method_name, checker_name)
return if @outputs_statically_created
@outputs.each do |o|
begin
log.debug "calling #{method_name} on output plugin dynamically created", type: Fluent::Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id
o.send(method_name) unless o.send(checker_name)
rescue Exception => e
log.warn "unexpected error while calling #{method_name} on output plugin dynamically created", plugin: o.class, plugin_id: o.plugin_id, error: e
log.warn_backtrace
end
end
end

def start
super
call_lifecycle_method(:start, :started?)
end

def after_start
super
call_lifecycle_method(:after_start, :after_started?)
end

def stop
super
call_lifecycle_method(:stop, :stopped?)
end

def before_shutdown
super
call_lifecycle_method(:before_shutdown, :before_shutdown?)
end

def shutdown
super
call_lifecycle_method(:shutdown, :shutdown?)
end

def after_shutdown
super
call_lifecycle_method(:after_shutdown, :after_shutdown?)
end

def close
super
call_lifecycle_method(:close, :closed?)
end

def terminate
super
call_lifecycle_method(:terminate, :terminated?)
end

def emit_sync(tag, es)
@counters_monitor.synchronize{ @emit_count += 1 }
begin
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/test/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ module Fluent
module Test
class DummyLogDevice
attr_reader :logs
attr_accessor :flush_logs

def initialize
@logs = []
@flush_logs = true
end

def reset
@logs = []
@logs = [] if @flush_logs
end

def tty?
Expand Down
62 changes: 62 additions & 0 deletions test/test_plugin_classes.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require_relative 'helper'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/plugin/bare_output'
require 'fluent/plugin/filter'

module FluentTest
Expand Down Expand Up @@ -48,6 +49,67 @@ def process(tag, es)
end
end

class FluentTestDynamicOutput < ::Fluent::Plugin::BareOutput
::Fluent::Plugin.register_output('test_dynamic_out', self)

attr_reader :child
attr_reader :started

def start
super
@started = true
@child = Fluent::Plugin.new_output('copy')
conf = config_element('DYNAMIC', '', {}, [
config_element('store', '', {'@type' => 'test_out', '@id' => 'dyn_out1'}),
config_element('store', '', {'@type' => 'test_out', '@id' => 'dyn_out2'}),
])
@child.configure(conf)
@child.start
end

def after_start
super
@child.after_start
end

def stop
super
@child.stop
end

def before_shutdown
super
@child.before_shutdown
end

def shutdown
@started = false
super
@child.shutdown
end

def after_shutdown
super
@child.after_shutdown
end

def close
super
@child.close
end

def terminate
super
@child.terminate
end

def process(tag, es)
es.each do |time, record|
@events[tag] << record
end
end
end

class FluentTestErrorOutput < ::Fluent::Plugin::Output
::Fluent::Plugin.register_output('test_out_error', self)

Expand Down
90 changes: 90 additions & 0 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -351,5 +351,95 @@ def configure_ra(conf_str)
assert_equal [true], label_filters.map{|i| i.terminated? }
assert_equal [true, true, true], label_outputs.map{|i| i.terminated? }
end

test 'plugin #shutdown is not called twice' do
assert_equal 1, @ra.inputs.size
assert_equal 0, @ra.filters.size
assert_equal 0, @ra.outputs.size
assert_equal 1, @ra.labels.size
assert_equal '@testing', @ra.labels.keys.first
assert_equal 1, @ra.labels.values.first.filters.size
assert_equal 3, @ra.labels.values.first.outputs.size

@ra.start

old_level = @ra.log.level
begin
@ra.log.instance_variable_get(:@logger).level = Fluent::Log::LEVEL_INFO - 1
assert_equal Fluent::Log::LEVEL_INFO, @ra.log.level

@ra.log.out.flush_logs = false

@ra.shutdown

test_out1_shutdown_logs = @ra.log.out.logs.select{|line| line =~ /shutting down output plugin type=:test_out plugin_id="test_out1"/ }
assert_equal 1, test_out1_shutdown_logs.size
ensure
@ra.log.out.flush_logs = true
@ra.log.out.reset
@ra.log.level = old_level
end
end
end

sub_test_case 'configured with MultiOutput plugin which creates plugin instances dynamically' do
setup do
@ra = RootAgent.new(log: $log)
stub(Engine).root_agent { @ra }
@ra.configure(Config.parse(<<-EOC, "(test)", "(test_dir)", true))
<source>
@type test_in
@id test_in
@label @testing
</source>
<label @testing>
<match **>
@type test_dynamic_out
@id test_dyn
</match>
</label>
EOC
@ra
end

test 'plugin status with multi output' do
assert_equal 1, @ra.inputs.size
assert_equal 0, @ra.filters.size
assert_equal 0, @ra.outputs.size
assert_equal 1, @ra.labels.size
assert_equal '@testing', @ra.labels.keys.first
assert_equal 0, @ra.labels.values.first.filters.size
assert_equal 1, @ra.labels.values.first.outputs.size

dyn_out = @ra.labels.values.first.outputs.first
assert_nil dyn_out.child

@ra.start

assert_equal 1, @ra.labels.values.first.outputs.size

assert dyn_out.child
assert_false dyn_out.child.outputs_statically_created
assert_equal 2, dyn_out.child.outputs.size

assert_equal true, dyn_out.child.outputs[0].started?
assert_equal true, dyn_out.child.outputs[1].started?
assert_equal true, dyn_out.child.outputs[0].after_started?
assert_equal true, dyn_out.child.outputs[1].after_started?

@ra.shutdown

assert_equal 1, @ra.labels.values.first.outputs.size

assert_false dyn_out.child.outputs_statically_created
assert_equal 2, dyn_out.child.outputs.size

assert_equal [true, true], dyn_out.child.outputs.map{|i| i.stopped? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.before_shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.after_shutdown? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.closed? }
assert_equal [true, true], dyn_out.child.outputs.map{|i| i.terminated? }
end
end
end

0 comments on commit 463ac14

Please sign in to comment.