Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter_with_time to Fluent::Plugin::Filter #1140

Merged
merged 3 commits into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,62 @@ class Filter < Base

helpers :event_emitter

def initialize
super
@has_filter_with_time = has_filter_with_time?
end

def filter(tag, time, record)
raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end

def filter_with_time(tag, time, record)
raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
begin
filtered_record = filter(tag, time, record)
new_es.add(time, filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
if @has_filter_with_time
es.each do |time, record|
begin
filtered_time, filtered_record = filter_with_time(tag, time, record)
new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
else
es.each do |time, record|
begin
filtered_record = filter(tag, time, record)
new_es.add(time, filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
end
new_es
end

private

def has_filter_with_time?
implmented_methods = self.class.instance_methods(false)
# Plugins that override `filter_stream` don't need check,
# because they may not call `filter` or `filter_with_time`
# for example fluentd/lib/fluent/plugin/filter_record_transformer.rb
return nil if implmented_methods.include?(:filter_stream)
case
when [:filter, :filter_with_time].all? { |e| implmented_methods.include?(e) }
raise "BUG: Filter plugins MUST be implemented either `filter` or `filter_with_time`"
when implmented_methods.include?(:filter)
false
when implmented_methods.include?(:filter_with_time)
true
else
raise NotImplementedError, "BUG: Filter plugins MUST be implmented either `filter` or `filter_with_time`"
end
end
end
end
end
2 changes: 2 additions & 0 deletions test/compat/test_calls_super.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ def shutdown; end
end
class DummyGoodFilter < Fluent::Filter
def configure(conf); super; end
def filter(tag, time, record); end
def start; super; end
def before_shutdown; super; end
def shutdown; super; end
end
class DummyBadFilter < Fluent::Filter
def configure(conf); super; end
def filter(tag, time, record); end
def start; end
def before_shutdown; end
def shutdown; end
Expand Down
140 changes: 117 additions & 23 deletions test/plugin/test_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ def filter(tag, time, record)
record
end
end
class NumDoublePluginWithTime < Fluent::Plugin::Filter
def filter_with_time(tag, time, record)
r = record.dup
r["num"] = r["num"].to_i * 2
[time, r]
end
end
class IgnoreForNumPluginWithTime < Fluent::Plugin::Filter
def filter_with_time(tag, time, record)
if record["num"].is_a? Numeric
nil
else
[time, record]
end
end
end
class InvalidPlugin < Fluent::Plugin::Filter
# Because of implemnting `filter_with_time` and `filter` methods
def filter_with_time(tag, time, record); end
def filter(tag, time, record); end
end
end

class FilterPluginTest < Test::Unit::TestCase
Expand All @@ -53,12 +74,24 @@ def emit_error_event(tag, time, record, error)
sub_test_case 'for basic dummy plugin' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::DummyPlugin.new
end

test 'plugin does not define #filter raises error' do
assert_raise NotImplementedError do
FluentPluginFilterTest::DummyPlugin.new
end
end
end

sub_test_case 'normal filter plugin' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::NumDoublePlugin.new
end

test 'has healthy lifecycle' do
assert !@p.configured?
@p.configure(config_element())
@p.configure(config_element)
assert @p.configured?

assert !@p.started?
Expand Down Expand Up @@ -94,7 +127,7 @@ def emit_error_event(tag, time, record, error)
assert @p.respond_to?(:plugin_id_configured?)
assert @p.respond_to?(:plugin_id)

@p.configure(config_element())
@p.configure(config_element)

assert !@p.plugin_id_configured?
assert @p.plugin_id
Expand Down Expand Up @@ -128,30 +161,13 @@ class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter
end
end

test 'plugin does not define #filter raises error' do
es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => "2", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
assert_raise NotImplementedError do
@p.filter_stream('testing', es)
end
end
end

sub_test_case 'normal filter plugin' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::NumDoublePlugin.new
end

test 'filters events correctly' do
test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => "2", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
@p.configure(config_element)
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

Expand Down Expand Up @@ -186,6 +202,7 @@ class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter
[event_time('2016-04-19 13:01:03 -0700'), {"num" => 2, "message" => "Ignored, yay!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
@p.configure(config_element)
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

Expand Down Expand Up @@ -213,8 +230,7 @@ class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter

test 'has router and can emit events to error streams' do
assert @p.has_router?

@p.configure(config_element())
@p.configure(config_element)
assert @p.router

@p.router = DummyRouter.new([])
Expand Down Expand Up @@ -252,4 +268,82 @@ class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter
assert_equal "Value of num is Number!", error_emits[0][3].message
end
end

sub_test_case 'filter plugins that is implmented `filter_with_time`' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::NumDoublePluginWithTime.new
end

test 'filters events correctly' do
test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => "2", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 3, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal 2, ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:03 -0700'), ary[1][0]
assert_equal 4, ary[1][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[2][0]
assert_equal 6, ary[2][1]["num"]
end
end

sub_test_case 'filter plugin that is implmented `filter_with_time` and returns nil for some records' do
setup do
Fluent::Test.setup
@p = FluentPluginFilterTest::IgnoreForNumPluginWithTime.new
end

test 'filter_stream ignores records which #filter_with_time return nil' do
test_es = [
[event_time('2016-04-19 13:01:00 -0700'), {"num" => "1", "message" => "Hello filters!"}],
[event_time('2016-04-19 13:01:03 -0700'), {"num" => 2, "message" => "Ignored, yay!"}],
[event_time('2016-04-19 13:01:05 -0700'), {"num" => "3", "message" => "Hello filters!"}],
]
@p.configure(config_element)
es = @p.filter_stream('testing', test_es)
assert es.is_a? Fluent::EventStream

ary = []
es.each do |time, r|
ary << [time, r]
end

assert_equal 2, ary.size

assert_equal event_time('2016-04-19 13:01:00 -0700'), ary[0][0]
assert_equal "Hello filters!", ary[0][1]["message"]
assert_equal "1", ary[0][1]["num"]

assert_equal event_time('2016-04-19 13:01:05 -0700'), ary[1][0]
assert_equal "3", ary[1][1]["num"]
end
end

sub_test_case 'filter plugins that is implmented both `filter_with_time` and `filter`' do
setup do
Fluent::Test.setup
end

test 'raises DuplicatedImplementError' do
assert_raise do
FluentPluginFilterTest::InvalidPlugin.new
end
end
end
end
54 changes: 48 additions & 6 deletions test/test_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,57 @@ def emit(klass, msgs, conf = '')
end

sub_test_case 'configure' do
test 'check default' do
assert_nothing_raised { create_driver }
test 'check to implement `filter` method' do
klass = Class.new(Fluent::Filter) do |c|
def filter(tag, time, record); end
end

assert_nothing_raised do
klass.new
end
end

test 'check to implement `filter_with_time` method' do
klass = Class.new(Fluent::Filter) do |c|
def filter_with_time(tag, time, record); end
end

assert_nothing_raised do
klass.new
end
end

test 'DO NOT check when implement `filter_stream`' do
klass = Class.new(Fluent::Filter) do |c|
def filter_stream(tag, es); end
end

assert_nothing_raised do
klass.new
end
end
end

sub_test_case 'filter' do
test 'NotImplementedError' do
not_implemented_filter = Class.new(Fluent::Filter)
assert_raise(NotImplementedError) { emit(not_implemented_filter, ['foo']) }
klass = Class.new(Fluent::Filter)

assert_raise NotImplementedError do
klass.new
end
end

test 'duplicated method implementation' do
klass = Class.new(Fluent::Filter) do |c|
def filter(tag, time, record); end
def filter_with_time(tag, time, record); end
end

assert_raise do
klass.new
end
end
end

sub_test_case 'filter' do
test 'null filter' do
null_filter = Class.new(Fluent::Filter) do |c|
def filter(tag, time, record)
Expand Down Expand Up @@ -61,6 +101,7 @@ def filter(tag, time, record)
def filter_stream(tag, es)
MultiEventStream.new
end
def filter(tag, time, record); record; end
end
es = emit(null_filter, ['foo'])
assert_equal(0, es.instance_variable_get(:@record_array).size)
Expand All @@ -71,6 +112,7 @@ def filter_stream(tag, es)
def filter_stream(tag, es)
es
end
def filter(tag, time, record); record; end
end
es = emit(pass_filter, ['foo'])
assert_equal(1, es.instance_variable_get(:@record_array).size)
Expand Down