Skip to content

Commit

Permalink
Merge pull request #1387 from CSharpRU/get-real-retry-info-from-buffe…
Browse files Browse the repository at this point in the history
…red-output

Ability to get real retry state of buffered output
  • Loading branch information
repeatedly committed Feb 2, 2017
1 parent f80d73c commit 2ad2dca
Showing 1 changed file with 27 additions and 2 deletions.
29 changes: 27 additions & 2 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MonitorAgentInput < Input
config_param :emit_interval, :time, default: 60
config_param :emit_config, :bool, default: false
config_param :include_config, :bool, default: true
config_param :include_retry, :bool, default: true

class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, agent)
Expand Down Expand Up @@ -77,7 +78,7 @@ def build_object(req, res)

# if ?debug=1 is set, set :with_debug_info for get_monitor_info
# and :pretty_json for render_json_error
opts = {with_config: @agent.include_config}
opts = {with_config: @agent.include_config, with_retry: @agent.include_retry}
if s = qs['debug'] and s[0]
opts[:with_debug_info] = true
opts[:pretty_json] = true
Expand All @@ -91,6 +92,10 @@ def build_object(req, res)
opts[:with_config] = Fluent::Config.bool_value(with_config)
end

if with_retry = get_search_parameter(qs, 'with_retry'.freeze)
opts[:with_retry] = Fluent::Config.bool_value(with_retry)
end

if tag = get_search_parameter(qs, 'tag'.freeze)
# ?tag= to search an output plugin by match pattern
if obj = @agent.plugin_info_by_tag(tag, opts)
Expand Down Expand Up @@ -263,7 +268,7 @@ def start
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

@loop = Coolio::Loop.new
opts = {with_config: @emit_config}
opts = {with_config: @emit_config, with_retry: false}
timer = TimerWatcher.new(@emit_interval, log) {
es = MultiEventStream.new
now = Engine.now
Expand Down Expand Up @@ -408,6 +413,11 @@ def get_monitor_info(pe, opts={})
end
}

if opts[:with_retry] && pe.instance_variable_defined?(:@num_errors) &&
(pe.instance_variable_get(:@num_errors) > 0)
obj['retry'] = get_retry_info(pe)
end

# include all instance variables if :with_debug_info is set
if opts[:with_debug_info]
iv = {}
Expand All @@ -431,6 +441,21 @@ def get_monitor_info(pe, opts={})
obj
end

RETRY_INFO = {
'steps' => '@num_errors',
'next_time' => '@next_retry_time',
}.freeze

def get_retry_info(pe)
retry_variables = {}

RETRY_INFO.each_pair { |key, param|
retry_variables[key] = pe.instance_variable_get(param)
}

retry_variables
end

def plugin_category(pe)
case pe
when Fluent::Input
Expand Down

0 comments on commit 2ad2dca

Please sign in to comment.