Skip to content

Commit

Permalink
Merge pull request #1010 from fluent/fix-shutdown-deadlock-issue-v14
Browse files Browse the repository at this point in the history
Fix shutdown deadlock issue v14
  • Loading branch information
tagomoris committed May 31, 2016
2 parents c8961d4 + 9b452e1 commit b89ed39
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
10 changes: 7 additions & 3 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,19 @@ def write(metadata_and_data, bulk: false, enqueue: false)
errors << e
end
end
operated_chunks.clear if errors.empty?

@stage_size += staged_bytesize

if errors.size > 0
log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class)
raise errors.first
end
rescue
ensure
operated_chunks.each do |chunk|
chunk.rollback rescue nil # nothing possible to do for #rollback failure
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
raise
end
end

Expand Down Expand Up @@ -365,6 +365,7 @@ def write_once(metadata, data, bulk: false, &block)
adding_bytesize = nil

chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }
enqueue_list = []

chunk.synchronize do
# retry this method if chunk is already queued (between getting chunk and entering critical section)
Expand Down Expand Up @@ -402,7 +403,7 @@ def write_once(metadata, data, bulk: false, &block)
elsif bulk
# this metadata might be enqueued already by other threads
# but #enqueue_chunk does nothing in such case
enqueue_chunk(metadata)
enqueue_list << metadata
raise ShouldRetry
end
end
Expand All @@ -412,6 +413,9 @@ def write_once(metadata, data, bulk: false, &block)
write_step_by_step(metadata, data, data.size / 3, &block)
end
rescue ShouldRetry
enqueue_list.each do |metadata|
enqueue_chunk(metadata)
end
retry
end

Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def thread_create(title)
raise
ensure
unless thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, error: $!
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
@_threads_mutex.synchronize do
@_threads.delete(::Thread.current.object_id)
Expand Down Expand Up @@ -132,6 +132,7 @@ def terminate
super
@_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
thread = @_threads[obj_id]
log.warn "killing existing thead", thread: thread
thread.kill if thread
end
@_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
Expand Down

0 comments on commit b89ed39

Please sign in to comment.