Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show errors on console under plugin development #1302

Merged
merged 5 commits into from
Nov 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions lib/fluent/plugin/out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,8 @@ def prefer_buffered_processing
false
end

def prefer_delayed_commit
@delayed
end

attr_accessor :delayed
attr_accessor :formatter

def initialize
super
@delayed = false
end

def configure(conf)
compat_parameters_convert(conf, :inject, :formatter)

Expand All @@ -76,10 +66,5 @@ def format(tag, time, record)
def write(chunk)
chunk.write_to($log)
end

def try_write(chunk)
chunk.write_to($log)
commit_write(chunk.unique_id)
end
end
end
28 changes: 17 additions & 11 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,16 @@ def expired?

# for tests
attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_time, :chunk_key_tag
attr_accessor :output_enqueue_thread_waiting, :in_tests
attr_accessor :output_enqueue_thread_waiting, :dequeued_chunks, :dequeued_chunks_mutex

# output_enqueue_thread_waiting: for test of output.rb itself
# in_tests: for tests of plugins with test drivers

def initialize
super
@counters_monitor = Monitor.new
@buffering = false
@delayed_commit = false
@as_secondary = false
@in_tests = false
@primary_instance = nil

# TODO: well organized counters
Expand All @@ -188,6 +186,7 @@ def initialize
@secondary = nil
@retry = nil
@dequeued_chunks = nil
@dequeued_chunks_mutex = nil
@output_enqueue_thread = nil
@output_flush_threads = nil

Expand Down Expand Up @@ -399,10 +398,8 @@ def start
end
@output_flush_thread_current_position = 0

unless @in_tests
if @flush_mode == :interval || @chunk_key_time
@output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time)
@output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
end
@secondary.start if @secondary
Expand Down Expand Up @@ -981,11 +978,11 @@ def try_flush
if output.delayed_commit
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
@counters_monitor.synchronize{ @write_count += 1 }
output.try_write(chunk)
@dequeued_chunks_mutex.synchronize do
# delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>)
@dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout)
end
output.try_write(chunk)
else # output plugin without delayed purge
chunk_id = chunk.unique_id
dump_chunk_id = dump_unique_id_hex(chunk_id)
Expand All @@ -994,11 +991,16 @@ def try_flush
log.trace "executing sync write", chunk: dump_chunk_id
output.write(chunk)
log.trace "write operation done, committing", chunk: dump_chunk_id
commit_write(chunk_id, secondary: using_secondary)
commit_write(chunk_id, delayed: false, secondary: using_secondary)
log.trace "done to commit a chunk", chunk: dump_chunk_id
end
rescue => e
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
if output.delayed_commit
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id }
end
end
@buffer.takeback_chunk(chunk.unique_id)

if @under_plugin_development
Expand Down Expand Up @@ -1059,7 +1061,11 @@ def submit_flush_once
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
state.next_time = 0
state.thread.run
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.thread.run
else
log.warn "thread is already dead"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one thread is dead, we should replace dead thread with new active one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Dead flush thread means any kind of bug of output.rb.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

Off topic. Collecting live and dead threads information is good for debug.
If one thread is dead, it means other thread also becomes dead.
If this situation continues, all output threads are dead.
We should display the message when all output threads are dead.

end
end

def force_flush
Expand Down Expand Up @@ -1204,7 +1210,7 @@ def flush_thread_run(state)
rescue => e
# normal errors are rescued by output plugins in #try_flush
# so this rescue section is for critical & unrecoverable errors
log.error "error on output thread", plugin_id: plugin_id, error_class: e.class, error: e
log.error "error on output thread", plugin_id: plugin_id, error: e
log.error_backtrace
raise
end
Expand Down
6 changes: 6 additions & 0 deletions lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def thread_create(title)
yield
thread_exit = true
rescue Exception => e
if @under_plugin_development
STDERR.puts "\nError raised in thread from #thread_create, #{e.class}:#{e.message}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Head \n is needed?
Previous log doesn't have \n?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code expects that leading text is '.' (means progress of test cases) from test runner, without tailing newline.

e.backtrace.each do |msg|
STDERR.puts [" ", msg].join
end
end
log.warn "thread exited by unexpected error", plugin: self.class, title: title, error: e
thread_exit = true
raise
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/test/driver/base_owner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def initialize(klass, opts: {}, &block)
@instance.system_config_override(opts)
end
@instance.log = TestLogger.new
@instance.log.under_plugin_development = true
@logs = @instance.log.out.logs

@event_streams = nil
Expand Down
22 changes: 19 additions & 3 deletions lib/fluent/test/driver/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ class Output < BaseOwner
def initialize(klass, opts: {}, &block)
super
raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Output" unless @instance.is_a? Fluent::Plugin::Output
@instance.in_tests = true
@flush_buffer_at_cleanup = nil
@wait_flush_completion = nil
@format_hook = nil
@format_results = []
end

def run(flush: true, **kwargs, &block)
def run(flush: true, wait_flush_completion: true, **kwargs, &block)
@flush_buffer_at_cleanup = flush
@wait_flush_completion = wait_flush_completion
super(**kwargs, &block)
end

Expand All @@ -55,7 +56,22 @@ def formatted

def flush
@instance.force_flush
Timeout.timeout(10){ sleep 0.1 until !@instance.buffer || @instance.buffer.queue.size == 0 }
wait_flush_completion if @wait_flush_completion
end

def wait_flush_completion
buffer_queue = ->(){ @instance.buffer && @instance.buffer.queue.size > 0 }
dequeued_chunks = ->(){
@instance.dequeued_chunks_mutex &&
@instance.dequeued_chunks &&
@instance.dequeued_chunks_mutex.synchronize{ @instance.dequeued_chunks.size > 0 }
}

Timeout.timeout(10) do
while buffer_queue.call || dequeued_chunks.call
sleep 0.1
end
end
end

def instance_hook_after_started
Expand Down
46 changes: 46 additions & 0 deletions lib/fluent/test/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DummyLogDevice
def initialize
@logs = []
@flush_logs = true
@use_stderr = false
end

def reset
Expand All @@ -40,7 +41,17 @@ def puts(*args)
args.each{ |arg| write(arg + "\n") }
end

def dump_stderr(&block)
@use_stderr = true
block.call
ensure
@use_stderr = false
end

def write(message)
if @use_stderr
STDERR.write message
end
@logs.push message
end

Expand All @@ -54,15 +65,50 @@ def close
end

class TestLogger < Fluent::PluginLogger
attr_accessor :under_plugin_development

def initialize
@logdev = DummyLogDevice.new
@under_plugin_development = false
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts)
log = Fluent::Log.new(logger)
super(log)
end

def error(*args, &block)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def error_backtrace(backtrace=$!.backtrace)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def fatal(*args, &block)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def fatal_backtrace(backtrace=$!.backtrace)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def reset
@logdev.reset
end
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def create_driver(conf)
sub_test_case 'configure' do
test 'required parameters' do
assert_raise_message("'tag' parameter is required") do
create_driver('')
Fluent::Plugin::DummyInput.new.configure(config_element('ROOT',''))
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_out_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def create_driver(conf = "")
d.instance.delayed = true

t = event_time("2016-05-23 00:22:13 -0800")
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.run(default_tag: 'test', flush: true, wait_flush_completion: false, shutdown: false) do
d.feed(t, {"message" => "null null null"})
d.feed(t, {"message" => "null null"})
d.feed(t, {"message" => "null"})
Expand Down
4 changes: 3 additions & 1 deletion test/plugin/test_out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def create_driver(conf = CONFIG, primary = create_primary)

test 'should be passed directory' do
assert_raise Fluent::ConfigError do
create_driver %[]
i = Fluent::Plugin::SecondaryFileOutput.new
i.acts_as_secondary(create_primary)
i.configure(config_element())
end

assert_nothing_raised do
Expand Down
29 changes: 0 additions & 29 deletions test/plugin/test_out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,6 @@ def create_driver(conf = CONFIG)
end
assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\":\"test\"}\n", out
end

data('oj' => 'oj', 'yajl' => 'yajl')
test '#try_write(asynchronous)' do |data|
d = create_driver(config_element("ROOT", "", {"output_type" => "json", "json_parser" => data}, [config_element("buffer")]))
time = event_time()
d.instance.delayed = true

out = capture_log do
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.feed(time, {'test' => 'test'})
end
end

assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\":\"test\"}\n", out
end
end

sub_test_case 'emit hash' do
Expand All @@ -169,20 +154,6 @@ def create_driver(conf = CONFIG)

assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\"=>\"test\"}\n", out
end

test '#try_write(asynchronous)' do
d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")]))
time = event_time()
d.instance.delayed = true

out = capture_log do
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.feed(time, {'test' => 'test'})
end
end

assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\"=>\"test\"}\n", out
end
end
end

Expand Down