Skip to content

Commit

Permalink
writing
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Nov 21, 2016
1 parent 7d9840b commit b6565b7
Show file tree
Hide file tree
Showing 6 changed files with 1,084 additions and 45 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ def flush_thread_run(state)
flush_thread_interval = @buffer_config.flush_thread_interval

# If the given clock_id is not supported, Errno::EINVAL is raised.
clock_id = Process::CLOCK_MONOTONIC rescue Process::CLOCK_MONOTONIC_RAW
clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC
state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval

while !self.after_started? && !self.stopped?
Expand Down
17 changes: 15 additions & 2 deletions lib/fluent/plugin_helper/event_loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ module EventLoop
# terminate: initialize internal state

EVENT_LOOP_RUN_DEFAULT_TIMEOUT = 0.5
EVENT_LOOP_SHUTDOWN_TIMEOUT = 5
EVENT_LOOP_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC

attr_reader :_event_loop # for tests

def event_loop_attach(watcher)
@_event_loop_mutex.synchronize do
@_event_loop.attach(watcher)
@_event_loop_attached_watchers << watcher
watcher
end
end

Expand All @@ -58,6 +62,7 @@ def initialize
@_event_loop_mutex = Mutex.new
# plugin MAY configure loop run timeout in #configure
@_event_loop_run_timeout = EVENT_LOOP_RUN_DEFAULT_TIMEOUT
@_event_loop_attached_watchers = []
end

def start
Expand All @@ -67,7 +72,7 @@ def start
thread_create :event_loop do
begin
default_watcher = DefaultWatcher.new
@_event_loop.attach(default_watcher)
event_loop_attach(default_watcher)
@_event_loop_running = true
@_event_loop.run(@_event_loop_run_timeout) # this method blocks
ensure
Expand All @@ -78,9 +83,17 @@ def start

def shutdown
@_event_loop_mutex.synchronize do
@_event_loop.watchers.each {|w| w.detach if w.attached? }
@_event_loop_attached_watchers.reverse.each do |w|
if w.attached?
w.detach
end
end
end
timeout_at = Process.clock_gettime(EVENT_LOOP_CLOCK_ID) + EVENT_LOOP_SHUTDOWN_TIMEOUT
while @_event_loop_running
if Process.clock_gettime(EVENT_LOOP_CLOCK_ID) >= timeout_at
log.warn "event loop does NOT exit."
end
sleep 0.1
end

Expand Down
Loading

0 comments on commit b6565b7

Please sign in to comment.