Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the detach process forward interval configurable #982

Merged
merged 2 commits into from
May 24, 2016
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions lib/fluent/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def initialize
@parent_uri = DRb.uri
end

def fork(delegate_object)
def fork(forward_interval, delegate_object)
ipr, ipw = IO.pipe # child Engine.emit_stream -> parent Engine.emit_stream
opr, opw = IO.pipe # parent target.emit -> child target.emit

Expand All @@ -49,14 +49,14 @@ def fork(delegate_object)
# parent process
ipw.close
opr.close
forward_thread = process_parent(ipr, opw, pid, delegate_object)
forward_thread = process_parent(ipr, opw, pid, forward_interval, delegate_object)
return pid, forward_thread
end

# child process
ipr.close
opw.close
forward_thread = process_child(ipw, opr, delegate_object)
forward_thread = process_child(ipw, opr, forward_interval, delegate_object)
return nil, forward_thread
end

Expand All @@ -77,14 +77,14 @@ def create_drb_uri
end

private
def process_child(ipw, opr, delegate_object)
def process_child(ipw, opr, forward_interval, delegate_object)
DRb.start_service(create_drb_uri, delegate_object)
child_uri = DRb.uri

send_header(ipw, child_uri)

# override target.emit_stream to write event stream to the pipe
fwd = new_forwarder(ipw, 0.5) # TODO interval
fwd = new_forwarder(ipw, forward_interval)
Engine.define_singleton_method(:emit_stream) do |tag,es|
fwd.emit(tag, es)
end
Expand Down Expand Up @@ -115,7 +115,7 @@ def shared_methods
]
end

def process_parent(ipr, opw, pid, delegate_object)
def process_parent(ipr, opw, pid, forward_interval, delegate_object)
child_uri = read_header(ipr)

# read event stream from the pipe and forward to Engine.emit_stream
Expand All @@ -127,7 +127,7 @@ def process_parent(ipr, opw, pid, delegate_object)

# return forwarder for DetachProcessMixin to
# override target.emit and write event stream to the pipe
fwd = new_forwarder(opw, 0.5) # TODO interval
fwd = new_forwarder(opw, forward_interval)
# note: override emit method on DetachProcessMixin
forward_thread.define_singleton_method(:forwarder) do
fwd
Expand Down Expand Up @@ -300,11 +300,11 @@ def on_exit_process(i)

private

def detach_process_impl(num, &block)
def detach_process_impl(num, forward_interval, &block)
children = []

num.times do |i|
pid, forward_thread = DetachProcessManager.instance.fork(self)
pid, forward_thread = DetachProcessManager.instance.fork(forward_interval, self)

if pid
# parent process
Expand Down Expand Up @@ -438,11 +438,17 @@ def configure(conf)
@detach_process = false
end
end

if forward_interval = conf['detach_process_forward_interval']
@forward_interval = forward_interval.to_f
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Config.time_value instead of to_f.

else
@forward_interval = 0.5
end
end

def detach_process(&block)
if @detach_process
detach_process_impl(1, &block)
detach_process_impl(1, @forward_interval, &block)
else
block.call
end
Expand Down Expand Up @@ -480,13 +486,19 @@ def configure(conf)
@detach_process = false
end
end

if forward_interval = conf['detach_process_forward_interval']
@forward_interval = forward_interval.to_f
else
@forward_interval = 0.5
end
end

private

def detach_multi_process(&block)
if @detach_process
detach_process_impl(@detach_process_num, &block)
detach_process_impl(@detach_process_num, @forward_interval, &block)
else
block.call
end
Expand Down