Skip to content

Commit

Permalink
update in_forward using server plugin helper
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Nov 15, 2016
1 parent f4c3ab8 commit 57e1c9f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 244 deletions.
221 changes: 52 additions & 169 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,30 @@
require 'yajl'
require 'digest'

require 'fluent/plugin/socket_util'
require 'fcntl'
require 'cool.io'

module Fluent::Plugin
class ForwardInput < Input
Fluent::Plugin.register_input('forward', self)

# See the wiki page below for protocol specification
# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1

helpers :event_loop
helpers :server

LISTEN_PORT = 24224

desc 'The port to listen to.'
config_param :port, :integer, default: LISTEN_PORT
desc 'The bind address to listen to.'
config_param :bind, :string, default: '0.0.0.0'

config_param :backlog, :integer, default: nil
# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src
desc 'The timeout time used to set linger option.'
config_param :linger_timeout, :integer, default: 0
# This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value.
config_param :blocking_timeout, :time, default: 0.5
desc 'Try to resolve hostname from IP addresses or not.'
config_param :resolve_hostname, :bool, default: nil
desc 'Connections will be disconnected right after receiving first message if this value is true.'
config_param :deny_keepalive, :bool, default: false

Expand Down Expand Up @@ -91,6 +90,15 @@ class ForwardInput < Input
def configure(conf)
super

if @source_hostname_key
# TODO: add test
if @resolve_hostname.nil?
@resolve_hostname = true
elsif !@resolve_hostname # user specifies "false" in config
raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
end
end

if @security
if @security.user_auth && @security.users.empty?
raise Fluent::ConfigError, "<user> sections required if user_auth enabled"
Expand Down Expand Up @@ -131,44 +139,35 @@ def configure(conf)
@lsock = @usock = nil
end

HEARTBEAT_UDP_PAYLOAD = "\0"

def start
super

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
server_create_connection(
:in_forward_server, @port,
bind: @bind,
shared: false,
resolve_name: @resolve_hostname,
linger_timeout: @linger_timeout,
backlog: @backlog,
&method(:handle_connection)
)

server_create(:in_forward_server_udp_heartbeat, @port, shared: false, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock|
log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data
begin
sock.write HEARTBEAT_UDP_PAYLOAD
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
log.trace "error while heartbeat response", host: sock.remote_host, error: e
end
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)

@lsock = listen(client)
event_loop_attach(@lsock)

@usock = client.listen_udp(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
event_loop_attach(@hbr)
end

def close
@lsock.close if @lsock
@usock.close if @usock
super
end

def listen(client)
log.info "listening fluent socket on #{@bind}:#{@port}"
sock = client.listen_tcp(@bind, @port)
s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection))
s.listen(@backlog) unless @backlog.nil?
s
end

private

def handle_connection(conn)
send_data = ->(serializer, data){ conn.write serializer.call(data) }

log.trace "connected fluent socket", address: conn.remote_addr, port: conn.remote_port
log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port
state = :established
nonce = nil
user_auth_salt = nil
Expand All @@ -182,7 +181,7 @@ def handle_connection(conn)
state = :pingpong
end

log.trace "accepted fluent socket", address: conn.remote_addr, port: conn.remote_port
log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port

read_messages(conn) do |msg, chunk_size, serializer|
case state
Expand All @@ -198,15 +197,11 @@ def handle_connection(conn)
log.debug "connection established", address: conn.remote_addr, port: conn.remote_port
state = :established
when :established
options = on_message(msg, chunk_size, conn.remote_addr)
options = on_message(msg, chunk_size, conn.remote_host)
if options && r = response(options)
send_data.call(serializer, r)
log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
if @deny_keepalive
conn.on_write_complete do
conn.close
end
end
conn.on_write_complete{ conn.close } if @deny_keepalive
send_data.call(serializer, r)
else
if @deny_keepalive
conn.close
Expand All @@ -222,7 +217,7 @@ def read_messages(conn, &block)
feeder = nil
serializer = nil
bytes = 0
conn.on_data do |data|
conn.data do |data|
# only for first call of callback
unless feeder
first = data[0]
Expand Down Expand Up @@ -258,26 +253,26 @@ def response(option)
nil
end

def on_message(msg, chunk_size, peeraddr)
def on_message(msg, chunk_size, remote_host)
if msg.nil?
# for future TCP heartbeat_request
return
end

# TODO: raise an exception if broken chunk is generated by recoverable situation
unless msg.is_a?(Array)
log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg
log.warn "incoming chunk is broken:", host: remote_host, msg: msg
return
end

tag = msg[0]
entries = msg[1]

if @chunk_size_limit && (chunk_size > @chunk_size_limit)
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: remote_host, limit: @chunk_size_limit, size: chunk_size
return
elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: remote_host, limit: @chunk_size_warn_limit, size: chunk_size
end

case entries
Expand All @@ -287,14 +282,14 @@ def on_message(msg, chunk_size, peeraddr)
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
es = check_and_skip_invalid_event(tag, es, remote_host) if @skip_invalid_event
es = add_source_host(es, remote_host) if @source_hostname_key
router.emit_stream(tag, es)

when Array
# Forward
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, peeraddr)
check_and_skip_invalid_event(tag, entries, remote_host)
else
es = Fluent::MultiEventStream.new
entries.each { |e|
Expand All @@ -306,7 +301,7 @@ def on_message(msg, chunk_size, peeraddr)
}
es
end
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
es = add_source_host(es, remote_host) if @source_hostname_key
router.emit_stream(tag, es)
option = msg[2]

Expand All @@ -315,29 +310,31 @@ def on_message(msg, chunk_size, peeraddr)
time = msg[1]
record = msg[2]
if @skip_invalid_event && invalid_event?(tag, time, record)
log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record
log.warn "got invalid event and drop it:", host: remote_host, tag: tag, time: time, record: record
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Fluent::Engine.now if time.to_i == 0
record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
record[@source_hostname_key] = remote_host if @source_hostname_key
router.emit(tag, time, record)
option = msg[3]
end

# return option for response
option
ensure
p(here: "ensure of on_message", error: $!) if $!
end

def invalid_event?(tag, time, record)
!((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
end

def check_and_skip_invalid_event(tag, es, peeraddr)
def check_and_skip_invalid_event(tag, es, remote_host)
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
if invalid_event?(tag, time, record)
log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record
log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
next
end
new_es.add(time, record)
Expand All @@ -354,11 +351,6 @@ def add_source_host(es, host)
new_es
end

def source_message(peeraddr)
_, port, host, addr = peeraddr
"host: #{host}, addr: #{addr}, port: #{port}"
end

def select_authenticate_users(node, username)
if node.nil? || node[:users].empty?
@security.users.select{|u| u.username == username}
Expand Down Expand Up @@ -424,114 +416,5 @@ def generate_pong(auth_result, reason_or_salt, nonce, shared_key)
shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest
['PONG', true, '', @security.self_hostname, shared_key_digest_hex]
end

class Handler < Coolio::Socket
attr_reader :protocol, :remote_port, :remote_addr, :remote_host

PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"]

def initialize(io, linger_timeout, log, on_connect_callback)
super(io)

@peeraddr = nil
if io.is_a?(TCPSocket) # for unix domain socket support in the future
@peeraddr = (io.peeraddr rescue PEERADDR_FAILED)
opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end

### TODO: disabling name rev resolv
proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED )
if addr == '?'
port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil
end
@protocol = proto
@remote_port = port
@remote_addr = addr
@remote_host = host
@writing = false
@closing = false
@mutex = Mutex.new

@chunk_counter = 0
@on_connect_callback = on_connect_callback
@log = log
@log.trace {
begin
remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername)
rescue
remote_port = nil
remote_addr = nil
end
[ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ]
}
end

def on_connect
@on_connect_callback.call(self)
end

# API to register callback for data arrival
def on_data(&callback)
@on_read_callback = callback
end

def on_read(data)
@on_read_callback.call(data)
rescue => e
@log.error "unexpected error on reading data from client", address: @remote_addr, error: e
@log.error_backtrace
close
end

def on_write_complete
closing = @mutex.synchronize {
@writing = false
@closing
}
if closing
close
end
end

def close
writing = @mutex.synchronize {
@closing = true
@writing
}
unless writing
super
end
end
end

class HeartbeatRequestHandler < Coolio::IO
def initialize(io, callback)
super(io)
@io = io
@callback = callback
end

def on_readable
begin
msg, addr = @io.recvfrom(1024)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
return
end
host = addr[3]
port = addr[1]
@callback.call(host, port, msg)
rescue
# TODO log?
end
end

def on_heartbeat_request(host, port, msg)
#log.trace "heartbeat request from #{host}:#{port}"
begin
@usock.send "\0", 0, host, port
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
end
end
end
end
Loading

0 comments on commit 57e1c9f

Please sign in to comment.