Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrating in_http to v0.14 API #1308

Merged
merged 11 commits into from
Nov 15, 2016
133 changes: 68 additions & 65 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,29 @@
# limitations under the License.
#

require 'fluent/plugin/input'
require 'fluent/plugin/parser'
require 'fluent/event'

require 'http/parser'
require 'webrick/httputils'
require 'uri'
require 'socket'
require 'json'

require 'cool.io'

require 'fluent/input'
require 'fluent/event'
require 'fluent/process'
module Fluent::Plugin
class InHttpParser < Parser
Fluent::Plugin.register_parser('in_http', self)
def parse(text)
# this plugin is dummy implementation not to raise error
yield nil, nil
end
end

module Fluent
class HttpInput < Input
Plugin.register_input('http', self)

include DetachMultiProcessMixin

require 'http/parser'
Fluent::Plugin.register_input('http', self)

def initialize
require 'webrick/httputils'
super
end
helpers :parser, :compat_parameters, :event_loop

EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8")

Expand All @@ -52,25 +53,36 @@ def initialize
config_param :add_http_headers, :bool, default: false
desc 'Add REMOTE_ADDR header to the record.'
config_param :add_remote_addr, :bool, default: false
desc 'The format of the HTTP body.'
config_param :format, :string, default: 'default'
config_param :blocking_timeout, :time, default: 0.5
desc 'Set a white list of domains that can do CORS (Cross-Origin Resource Sharing)'
config_param :cors_allow_origins, :array, default: nil
desc 'Respond with empty gif image of 1x1 pixel.'
config_param :respond_with_empty_img, :bool, default: false

config_section :parse do
config_set_default :@type, 'in_http'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is using nil bad idea?
It can avoid InHttpParser definition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, using nil is bad idea for here, because we can't specify nil explicitly in configuration file.

end

EVENT_RECORD_PARAMETER = '_event_record'

def configure(conf)
compat_parameters_convert(conf, :parser)

super

m = if @format == 'default'
m = if @parser_configs.first['@type'] == 'in_http'
@parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
@parser_msgpack.estimate_current_event = false
@parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
@parser_json.estimate_current_event = false
@format_name = 'default'
method(:parse_params_default)
else
@parser = Plugin.new_parser(@format)
@parser.configure(conf)
@parser = parser_create
@format_name = @parser_configs.first['@type']
method(:parse_params_with_parser)
end
(class << self; self; end).module_eval do
self.singleton_class.module_eval do
define_method(:parse_params, m)
end
end
Expand Down Expand Up @@ -100,7 +112,11 @@ def on_timer
end

def start
log.debug "listening http on #{@bind}:#{@port}"
@_event_loop_run_timeout = @blocking_timeout

super

log.debug "listening http", bind: @bind, port: @port

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
Expand All @@ -109,38 +125,24 @@ def start
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
lsock = client.listen_tcp(@bind, @port)

detach_multi_process do
super
@km = KeepaliveManager.new(@keepalive_timeout)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
@body_size_limit, @format, log,
@cors_allow_origins)
@lsock.listen(@backlog) unless @backlog.nil?

@loop = Coolio::Loop.new
@loop.attach(@km)
@loop.attach(@lsock)

@thread = Thread.new(&method(:run))
end
@km = KeepaliveManager.new(@keepalive_timeout)
@lsock = Coolio::TCPServer.new(
lsock, nil, Handler, @km, method(:on_request),
@body_size_limit, @format_name, log,
@cors_allow_origins
)
@lsock.listen(@backlog) unless @backlog.nil?
event_loop_attach(@km)
event_loop_attach(@lsock)

@float_time_parser = Fluent::NumericTimeParser.new(:float)
end

def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
def close
@lsock.close
@thread.join

super
end

def run
@loop.run(@blocking_timeout)
rescue
log.error "unexpected error", error: $!.to_s
log.error_backtrace
end

def on_request(path_info, params)
begin
path = path_info[1..-1] # remove /
Expand Down Expand Up @@ -170,9 +172,9 @@ def on_request(path_info, params)
end
time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time))
param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time)
else
record_time.nil? ? Engine.now : record_time
record_time.nil? ? Fluent::Engine.now : record_time
end
rescue
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
Expand All @@ -182,7 +184,7 @@ def on_request(path_info, params)
begin
# Support batched requests
if record.is_a?(Array)
mes = MultiEventStream.new
mes = Fluent::MultiEventStream.new
record.each do |single_record|
if @add_http_headers
params.each_pair { |k,v|
Expand Down Expand Up @@ -215,22 +217,23 @@ def on_request(path_info, params)
private

def parse_params_default(params)
record = if msgpack = params['msgpack']
Engine.msgpack_factory.unpacker.feed(msgpack).read
elsif js = params['json']
JSON.parse(js)
else
raise "'json' or 'msgpack' parameter is required"
end
return nil, record
if msgpack = params['msgpack']
@parser_msgpack.parse(msgpack) do |_time, record|
return nil, record
end
elsif js = params['json']
@parser_json.parse(js) do |_time, record|
return nil, record
end
else
raise "'json' or 'msgpack' parameter is required"
end
end

EVENT_RECORD_PARAMETER = '_event_record'

def parse_params_with_parser(params)
if content = params[EVENT_RECORD_PARAMETER]
@parser.parse(content) { |time, record|
raise "Received event is not #{@format}: #{content}" if record.nil?
raise "Received event is not #{@format_name}: #{content}" if record.nil?
return time, record
}
else
Expand All @@ -241,13 +244,13 @@ def parse_params_with_parser(params)
class Handler < Coolio::Socket
attr_reader :content_type

def initialize(io, km, callback, body_size_limit, format, log, cors_allow_origins)
def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins)
super(io)
@km = km
@callback = callback
@body_size_limit = body_size_limit
@next_close = false
@format = format
@format_name = format_name
@log = log
@cors_allow_origins = cors_allow_origins
@idle = 0
Expand Down Expand Up @@ -355,7 +358,7 @@ def on_message_complete
uri = URI.parse(@parser.request_url)
params = WEBrick::HTTPUtils.parse_query(uri.query)

if @format != 'default'
if @format_name != 'default'
params[EVENT_RECORD_PARAMETER] = @body
elsif @content_type =~ /^application\/x-www-form-urlencoded/
params.update WEBrick::HTTPUtils.parse_query(@body)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def parse_partial_data(data, &block)
end

def parse_time(record)
if @time_key && record.has_key?(@time_key)
if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key)
src = if @keep_time_key
record[@time_key]
else
Expand Down
16 changes: 14 additions & 2 deletions lib/fluent/plugin_helper/compat_parameters.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ module CompatParameters
}

PARSER_PARAMS = {
"format" => "@type",
"format" => nil,
"types" => nil,
"types_delimiter" => nil,
"types_label_delimiter" => nil,
Expand Down Expand Up @@ -217,14 +217,17 @@ def compat_parameters_inject(conf)

def compat_parameters_extract(conf)
return unless conf.elements('extract').empty?
return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) }
return if EXTRACT_PARAMS.keys.all?{|k| !conf.has_key?(k) } && !conf.has_key?('format')

# TODO: warn obsolete parameters if these are deprecated
hash = compat_parameters_copy_to_subsection_attributes(conf, EXTRACT_PARAMS)

if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch'])
hash['time_key'] ||= 'time'
hash['time_type'] = 'unixtime'
elsif conf.has_key?('format') && conf["format"].start_with?("/") && conf["format"].end_with?("/") # old-style regexp parser
hash['time_key'] ||= 'time'
hash['time_type'] ||= 'string'
end
if conf.has_key?('localtime') || conf.has_key?('utc')
if conf.has_key?('localtime') && conf.has_key?('utc')
Expand Down Expand Up @@ -253,6 +256,15 @@ def compat_parameters_parser(conf)
# TODO: warn obsolete parameters if these are deprecated
hash = compat_parameters_copy_to_subsection_attributes(conf, PARSER_PARAMS)

if conf["format"]
if conf["format"].start_with?("/") && conf["format"].end_with?("/")
hash["@type"] = "regexp"
hash["expression"] = conf["format"][1..-2]
else
hash["@type"] = conf["format"]
end
end

if conf["types"]
delimiter = conf["types_delimiter"] || ','
label_delimiter = conf["types_label_delimiter"] || ':'
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/time.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def to_int
@sec
end

def to_f
@sec + @nsec / 1_000_000_000.0
end

# for Time.at
def to_r
Rational(@sec * 1_000_000_000 + @nsec, 1_000_000_000)
Expand Down
Loading