Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keepalive for out_forward plugin #2393

Merged
merged 10 commits into from
May 8, 2019

Conversation

ganmacs
Copy link
Member

@ganmacs ganmacs commented Apr 19, 2019

Which issue(s) this PR fixes:
Fixes #2188

What this PR does / why we need it:

Implemented keepalive feature for out_forward plugin.

Docs Changes:

Needed. I'll add a detail of this feature to https://github.com/fluent/fluentd-docs/blob/master/docs/v1.0/out_forward.txt
fluent/fluentd-docs#632

Release Note:

No need or use PR title?

@repeatedly Will you please review?

Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
@repeatedly repeatedly added feature request *Deprecated Label* Use enhancement label in general v1 labels Apr 19, 2019
Change to default ref count is 1 in case that socket is expired but it
is used.

Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
Copy link
Member

@repeatedly repeatedly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will test this patch on my environment.

@@ -201,9 +205,9 @@ def configure(conf)

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure)
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: keepalive_timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keepalive_timeout?

else
node = Node.new(self, server, failure: failure)
node = Node.new(self, server, failure: failure, keepalive: @keepalive, keepalive_timeout: keepalive_timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -227,6 +231,10 @@ def configure(conf)
raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
end

if !@keepalive && @keepalive_timeout
log.warn 'The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive: true` to your conf.'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: is not needed.

@@ -279,13 +287,21 @@ def start
end
end
end

if @keepalive && @keepalive_timeout
timer_execute(:keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding out_forward prefix is better for timer.

if expired?(key)
# Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
@inactive_socks[key] = @active_socks.delete(key)
@log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space before #{}.

# This method is expected to be called in class which doesn't call #fetch_or
def revoke_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use detect so this point guarantees no multiple entries for val, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. and added comment 8bffce6

@inactive_socks[key].ref -= 1
return
else
@log.warn("Not found key for dec_ref_by_value: #{key}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about #{key.name || key.object_id}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect key is Thread.current.object_id. So it already returns object id(and Thread.current.object_id.name raises NoMethodError I think).

@socket_cache.fetch_or { @sender.create_transfer_socket(host || resolved_host, port, @hostname) }
else
@log.debug('connect new socket')
@sender.create_transfer_socket(resolved_host, port, @hostname)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this point also host || resolved_host?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right 44cb301

Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
def clear
@mutex.synchronize do
@inactive_socks.values.each do |s|
s.sock.close
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding rescue nil is better? Hard to recover broken socket here.

@inactive_socks.clear

@active_socks.values.each do |s|
s.sock.close
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

# 0 means sockets stored in this class received all acks
if @inactive_socks[k].ref <= 0
s = @inactive_socks.delete(k)
s.sock.close
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

sock.close rescue nil
if @keepalive
@socket_cache.revoke
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following code is needed?

else
  sock.close rescue nil
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I didn't mean to delete the code. d3cb120

Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
@repeatedly
Copy link
Member

If no reviews from developers, I will merge soon.

Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late. Looks reasonable for me.

@repeatedly repeatedly merged commit 272bc85 into fluent:master May 8, 2019
@repeatedly
Copy link
Member

Thanks for hard work!

@ganmacs ganmacs deleted the keepalive-for-out_forward branch May 9, 2019 02:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request *Deprecated Label* Use enhancement label in general v1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

keepalive property
3 participants