Skip to content

Commit

Permalink
Merge pull request #1091 from cosmo0920/move-plugin-util-modules-to-c…
Browse files Browse the repository at this point in the history
…ompat

Move plugin util modules to compat
  • Loading branch information
tagomoris authored Jul 28, 2016
2 parents 41a070d + 9eeae50 commit 27888bc
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 337 deletions.
129 changes: 129 additions & 0 deletions lib/fluent/compat/exec_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'msgpack'
require 'yajl'

require 'fluent/engine'
require 'fluent/plugin'
require 'fluent/parser'

module Fluent
module Compat
module ExecUtil
SUPPORTED_FORMAT = {
'tsv' => :tsv,
'json' => :json,
'msgpack' => :msgpack,
}

class Parser
def initialize(on_message)
@on_message = on_message
end
end

class TextParserWrapperParser < Parser
def initialize(conf, on_message)
@parser = Plugin.new_parser(conf['format'])
@parser.configure(conf)
super(on_message)
end

def call(io)
io.each_line(&method(:each_line))
end

def each_line(line)
line.chomp!
@parser.parse(line) { |time, record|
@on_message.call(record, time)
}
end
end

class TSVParser < Parser
def initialize(keys, on_message)
@keys = keys
super(on_message)
end

def call(io)
io.each_line(&method(:each_line))
end

def each_line(line)
line.chomp!
vals = line.split("\t")

record = Hash[@keys.zip(vals)]

@on_message.call(record)
end
end

class JSONParser < Parser
def call(io)
y = Yajl::Parser.new
y.on_parse_complete = @on_message
y.parse(io)
end
end

class MessagePackParser < Parser
def call(io)
@u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
@u.each(&@on_message)
rescue EOFError
end
end
end

class Formatter
end

class TSVFormatter < Formatter
def initialize(in_keys)
@in_keys = in_keys
super()
end

def call(record, out)
last = @in_keys.length-1
for i in 0..last
key = @in_keys[i]
out << record[key].to_s
out << "\t" if i != last
end
out << "\n"
end
end

class JSONFormatter < Formatter
def call(record, out)
out << Yajl.dump(record) << "\n"
end
end

class MessagePackFormatter < Formatter
def call(record, out)
record.to_msgpack(out)
end
end
end
end
end
54 changes: 54 additions & 0 deletions lib/fluent/compat/file_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
module Compat
module FileUtil
# Check file is writable if file exists
# Check directory is writable if file does not exist
#
# @param [String] path File path
# @return [Boolean] file is writable or not
def writable?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)

dirname = File.dirname(path)
return false if !File.directory?(dirname)
File.writable?(dirname)
end
module_function :writable?

# Check file is writable in conjunction wtih mkdir_p(dirname(path))
#
# @param [String] path File path
# @return [Boolean] file writable or not
def writable_p?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)

dirname = File.dirname(path)
until File.exist?(dirname)
dirname = File.dirname(dirname)
end

return false if !File.directory?(dirname)
File.writable?(dirname)
end
module_function :writable_p?
end
end
end
165 changes: 165 additions & 0 deletions lib/fluent/compat/socket_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'ipaddr'

require 'cool.io'

require 'fluent/plugin'
require 'fluent/input'

module Fluent
module Compat
module SocketUtil
def create_udp_socket(host)
if IPAddr.new(IPSocket.getaddress(host)).ipv4?
UDPSocket.new
else
UDPSocket.new(Socket::AF_INET6)
end
end
module_function :create_udp_socket

class UdpHandler < Coolio::IO
def initialize(io, log, body_size_limit, callback)
super(io)
@io = io
@log = log
@body_size_limit = body_size_limit
@callback = callback
end

def on_readable
msg, addr = @io.recvfrom_nonblock(@body_size_limit)
msg.chomp!
@callback.call(msg, addr)
rescue => e
@log.error "unexpected error", error: e
end
end

class TcpHandler < Coolio::Socket
PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"]

def initialize(io, log, delimiter, callback)
super(io)
@timeout = 0
if io.is_a?(TCPSocket)
@addr = (io.peeraddr rescue PEERADDR_FAILED)

opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
end
@delimiter = delimiter
@callback = callback
@log = log
@log.trace { "accepted fluent socket object_id=#{self.object_id}" }
@buffer = "".force_encoding('ASCII-8BIT')
end

def on_connect
end

def on_read(data)
@buffer << data
pos = 0

while i = @buffer.index(@delimiter, pos)
msg = @buffer[pos...i]
@callback.call(msg, @addr)
pos = i + @delimiter.length
end
@buffer.slice!(0, pos) if pos > 0
rescue => e
@log.error "unexpected error", error: e
close
end

def on_close
@log.trace { "closed fluent socket object_id=#{self.object_id}" }
end
end

class BaseInput < Fluent::Input
def initialize
super
require 'fluent/parser'
end

desc 'Tag of output events.'
config_param :tag, :string
desc 'The format of the payload.'
config_param :format, :string
desc 'The port to listen to.'
config_param :port, :integer, default: 5150
desc 'The bind address to listen to.'
config_param :bind, :string, default: '0.0.0.0'
desc "The field name of the client's hostname."
config_param :source_host_key, :string, default: nil
config_param :blocking_timeout, :time, default: 0.5

def configure(conf)
super

@parser = Plugin.new_parser(@format)
@parser.configure(conf)
end

def start
super

@loop = Coolio::Loop.new
@handler = listen(method(:on_message))
@loop.attach(@handler)
@thread = Thread.new(&method(:run))
end

def shutdown
@loop.watchers.each { |w| w.detach }
@loop.stop if @loop.instance_variable_get("@running")
@handler.close
@thread.join

super
end

def run
@loop.run(@blocking_timeout)
rescue => e
log.error "unexpected error", error: e
log.error_backtrace
end

private

def on_message(msg, addr)
@parser.parse(msg) { |time, record|
unless time && record
log.warn "pattern not match: #{msg.inspect}"
return
end

record[@source_host_key] = addr[3] if @source_host_key
router.emit(@tag, time, record)
}
rescue => e
log.error msg.dump, error: e, host: addr[3]
log.error_backtrace
end
end
end
end
end
34 changes: 34 additions & 0 deletions lib/fluent/compat/string_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
module Compat
module StringUtil
def match_regexp(regexp, string)
begin
return regexp.match(string)
rescue ArgumentError => e
raise e unless e.message.index("invalid byte sequence in".freeze).zero?
$log.info "invalid byte sequence is replaced in `#{string}`"
string = string.scrub('?')
retry
end
return true
end
module_function :match_regexp
end
end
end
Loading

0 comments on commit 27888bc

Please sign in to comment.