Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock buffers in order of metadata #1722

Merged
merged 4 commits into from
Nov 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,74 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
def empty?
timekey.nil? && tag.nil? && variables.nil?
end

def cmp_variables(v1, v2)
if v1.nil? && v2.nil?
return 0
elsif v1.nil? # v2 is non-nil
return -1
elsif v2.nil? # v1 is non-nil
return 1
end
# both of v1 and v2 are non-nil
v1_sorted_keys = v1.keys.sort
v2_sorted_keys = v2.keys.sort
if v1_sorted_keys != v2_sorted_keys
if v1_sorted_keys.size == v2_sorted_keys.size
v1_sorted_keys <=> v2_sorted_keys
else
v1_sorted_keys.size <=> v2_sorted_keys.size
end
else
v1_sorted_keys.each do |k|
a = v1[k]
b = v2[k]
if a && b && a != b
return a <=> b
elsif a && b || (!a && !b) # same value (including both are nil)
next
elsif a # b is nil
return 1
else # a is nil (but b is non-nil)
return -1
end
end

0
end
end

def <=>(o)
timekey2 = o.timekey
tag2 = o.tag
variables2 = o.variables
if (!!timekey ^ !!timekey2) || (!!tag ^ !!tag2) || (!!variables ^ !!variables2)
# One has value in a field, but another doesn't have value in same field
# This case occurs very rarely
if timekey == timekey2 # including the case of nil == nil
if tag == tag2
cmp_variables(variables, variables2)
elsif tag.nil?
-1
elsif tag2.nil?
1
else
tag <=> tag2
end
elsif timekey.nil?
-1
elsif timekey2.nil?
1
else
timekey <=> timekey2
end
else
# objects have values in same field pairs (comparison with non-nil and nil doesn't occur here)
(timekey <=> timekey2 || 0).nonzero? || # if `a <=> b` is nil, then both are nil
(tag <=> tag2 || 0).nonzero? ||
cmp_variables(variables, variables2)
end
end
end

# for tests
Expand Down Expand Up @@ -203,7 +271,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
chunks_to_enqueue = []

begin
metadata_and_data.each do |metadata, data|
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
Expand Down
3 changes: 2 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,8 @@ def create_chunk_es(metadata, es)
assert{ @p.stage[@dm1].size == 2 }
assert !@p.stage[@dm1].rollbacked

@p.stage[@dm1].failing = true
meta_list = [@dm0, @dm1, @dm2, @dm3].sort
@p.stage[meta_list.last].failing = true

assert_raise(FluentPluginBufferTest::DummyMemoryChunkError) do
@p.write({ @dm2 => [row], @dm3 => [row], @dm0 => [row, row, row], @dm1 => [row, row] })
Expand Down
89 changes: 89 additions & 0 deletions test/plugin/test_metadata.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
require_relative '../helper'
require 'fluent/plugin/buffer'

class BufferMetadataTest < Test::Unit::TestCase

def meta(timekey=nil, tag=nil, variables=nil)
Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
end

setup do
Fluent::Test.setup
end

sub_test_case 'about metadata' do
test 'comparison of variables should be stable' do
m = meta(nil, nil, nil)
# different sets of keys
assert_equal(-1, m.cmp_variables({}, {a: 1}))
assert_equal(1, m.cmp_variables({a: 1}, {}))
assert_equal(1, m.cmp_variables({c: 1}, {a: 1}))
assert_equal(-1, m.cmp_variables({a: 1}, {a: 1, b: 2}))
assert_equal(1, m.cmp_variables({a: 1, c: 1}, {a: 1, b: 2}))
assert_equal(1, m.cmp_variables({a: 1, b: 0, c: 1}, {a: 1, b: 2}))
# same set of keys
assert_equal(-1, m.cmp_variables({a: 1}, {a: 2}))
assert_equal(-1, m.cmp_variables({a: 1, b: 0}, {a: 1, b: 1}))
assert_equal(-1, m.cmp_variables({a: 1, b: 1, c: 100}, {a: 1, b: 1, c: 200}))
assert_equal(-1, m.cmp_variables({b: 1, c: 100, a: 1}, {a: 1, b: 1, c: 200})) # comparison sorts keys
assert_equal(-1, m.cmp_variables({a: nil}, {a: 1}))
assert_equal(-1, m.cmp_variables({a: 1, b: nil}, {a: 1, b: 1}))
end

test 'comparison of metadata should be stable' do
n = Time.now.to_i

assert_equal(0, meta(nil, nil, nil) <=> meta(nil, nil, nil))
assert_equal(0, meta(n, nil, nil) <=> meta(n, nil, nil))
assert_equal(0, meta(nil, "t1", nil) <=> meta(nil, "t1", nil))
assert_equal(0, meta(nil, nil, {}) <=> meta(nil, nil, {}))
assert_equal(0, meta(nil, nil, {a: "1"}) <=> meta(nil, nil, {a: "1"}))
assert_equal(0, meta(n, nil, {}) <=> meta(n, nil, {}))
assert_equal(0, meta(n, "t1", {}) <=> meta(n, "t1", {}))
assert_equal(0, meta(n, "t1", {a: "x", b: 10}) <=> meta(n, "t1", {a: "x", b: 10}))

# timekey is 1st comparison key
assert_equal(-1, meta(n - 300, nil, nil) <=> meta(n - 270, nil, nil))
assert_equal(1, meta(n + 1, "a", nil) <=> meta(n - 1, "b", nil))
assert_equal(-1, meta(n - 1, nil, {a: 100}) <=> meta(n + 1, nil, {}))

# tag is 2nd
assert_equal(-1, meta(nil, "a", {}) <=> meta(nil, "b", {}))
assert_equal(-1, meta(n, "a", {}) <=> meta(n, "b", {}))
assert_equal(1, meta(nil, "x", {a: 1}) <=> meta(nil, "t", {}))
assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {}))
assert_equal(1, meta(nil, "x", {a: 1}) <=> meta(nil, "t", {a: 1}))
assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {a: 2}))
assert_equal(1, meta(n, "x", {a: 1}) <=> meta(n, "t", {a: 10, b: 1}))

# variables is the last
assert_equal(-1, meta(nil, nil, {}) <=> meta(nil, nil, {a: 1}))
assert_equal(-1, meta(n, "t", {}) <=> meta(n, "t", {a: 1}))
assert_equal(1, meta(n, "t", {a: 1}) <=> meta(n, "t", {}))
assert_equal(-1, meta(n, "t", {a: 1}) <=> meta(n, "t", {a: 2}))
assert_equal(-1, meta(n, "t", {a: 1}) <=> meta(n, "t", {a: 1, b: 1}))
assert_equal(1, meta(nil, nil, {b: 1}) <=> meta(nil, nil, {a: 1}))
assert_equal(1, meta(n, "t", {b: 1}) <=> meta(n, "t", {a: 1}))
end

test 'metadata can be sorted' do
n = Time.now.to_i
m0 = meta(nil, nil, nil)
m1 = meta(n - 1, nil, nil)
m2 = meta(n - 1, "a", nil)
m3 = meta(n - 1, "a", {a: 1})
m4 = meta(n - 1, "a", {a: 100})
m5 = meta(n - 1, "a", {a: 100, b: 1})
m6 = meta(n - 1, "aa", nil)
m7 = meta(n - 1, "aa", {a: 1})
m8 = meta(n - 1, "b", nil)
m9 = meta(n, nil, nil)
m10 = meta(n + 1, nil, {a: 1})
expected = [m0, m1, m2, m3, m4, m5, m6, m7, m8, m9, m10].freeze
ary = expected.dup
100.times do
assert_equal expected, ary.shuffle.sort
end
end
end
end