Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide chunk.key for v0.12 plugins which specifies key in overriding #emit #992

Merged
merged 2 commits into from
May 27, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
require 'fluent/compat/call_super_mixin'
require 'fluent/compat/output_chain'
require 'fluent/timezone'
require 'fluent/mixin'

require 'fluent/plugin_helper/compat_parameters'

Expand Down Expand Up @@ -96,6 +97,12 @@ def key
end
end

module AddKeyToChunkMixin
def key
self.metadata.variables[:key]
end
end

module ChunkSizeCompatMixin
def size
self.bytesize
Expand All @@ -106,6 +113,7 @@ module BufferedChunkMixin
# prepend this module to BufferedOutput (including ObjectBufferedOutput) plugin singleton class
def write(chunk)
chunk.extend(ChunkSizeCompatMixin)
chunk.extend(AddKeyToChunkMixin) if chunk.metadata.variables && chunk.metadata.variables.has_key?(:key)
super
end
end
Expand Down Expand Up @@ -227,6 +235,7 @@ def configure(conf)
end

methods_of_plugin = self.class.instance_methods(false)
@overrides_emit = methods_of_plugin.include?(:emit)
@overrides_format_stream = methods_of_plugin.include?(:format_stream)

super
Expand All @@ -240,6 +249,45 @@ def configure(conf)
(class << self; self; end).module_eval do
prepend BufferedChunkMixin
end

if @overrides_emit
self.singleton_class.module_eval do
attr_accessor :last_emit_via_buffer
end
output_plugin = self
m = Module.new do
define_method(:emit) do |key, data, chain|
# receivers of this method are buffer instances
output_plugin.last_emit_via_buffer = [key, data]
end
end
@buffer.singleton_class.module_eval do
prepend m
end
end
end

# original implementation of v0.12 BufferedOutput
def emit(tag, es, chain, key="")
# this method will not be used except for the case that plugin calls super
@emit_count += 1
data = format_stream(tag, es)
if @buffer.emit(key, data, chain)
submit_flush
end
end

def submit_flush
# nothing todo: blank method to be called from #emit of 3rd party plugins
end

def format_stream(tag, es)
# this method will not be used except for the case that plugin calls super
out = ''
es.each do |time, record|
out << format(tag, time, record)
end
out
end

# #format MUST be implemented in plugin
Expand All @@ -248,10 +296,30 @@ def configure(conf)
# This method overrides Fluent::Plugin::Output#handle_stream_simple
# because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it
def handle_stream_simple(tag, es, enqueue: false)
if @overrides_emit
current_emit_count = @emit_count
size = es.size
key = data = nil
begin
emit(tag, es, NULL_OUTPUT_CHAIN)
key, data = self.last_emit_via_buffer
ensure
@emit_count = current_emit_count
self.last_emit_via_buffer = nil
end
# on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
write_guard do
@buffer.write({meta => [data, size]}, bulk: true, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
end

if @overrides_format_stream
meta = metadata(nil, nil, nil)
bulk = format_stream(tag, es)
size = es.size
bulk = format_stream(tag, es)
write_guard do
@buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue)
end
Expand Down