diff --git a/fluent-plugin-webhdfs.gemspec b/fluent-plugin-webhdfs.gemspec index fd2c128..1d6b309 100644 --- a/fluent-plugin-webhdfs.gemspec +++ b/fluent-plugin-webhdfs.gemspec @@ -23,5 +23,5 @@ Gem::Specification.new do |gem| gem.add_development_dependency "bzip2-ffi" gem.add_development_dependency "zstandard" gem.add_runtime_dependency "fluentd", '>= 0.14.22' - gem.add_runtime_dependency "webhdfs", '>= 0.10.0' + gem.add_runtime_dependency "webhdfs", '>= 0.11.0' end diff --git a/lib/fluent/plugin/out_webhdfs.rb b/lib/fluent/plugin/out_webhdfs.rb index 3f35f69..83c7e26 100644 --- a/lib/fluent/plugin/out_webhdfs.rb +++ b/lib/fluent/plugin/out_webhdfs.rb @@ -70,6 +70,8 @@ class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output config_param :renew_kerberos_delegation_token, :bool, default: false desc 'delegation token reuse timer (default 8h)' config_param :renew_kerberos_delegation_token_interval, :time, default: 8 * 60 * 60 + desc 'delegation token max-lifetime (default 7d)' + config_param :kerberos_delegation_token_max_lifetime, :time, default: 7 * 24 * 60 * 60 SUPPORTED_COMPRESS = [:gzip, :bzip2, :snappy, :hadoop_snappy, :lzo_command, :zstd, :text] desc "Compression method (#{SUPPORTED_COMPRESS.join(',')})" @@ -114,7 +116,7 @@ def configure(conf) else 86400 end if buffer_config = conf.elements(name: "buffer").first - timekey = buffer_config["timekey"] || timekey + timekey = buffer_config["timekey"] || timekey end compat_parameters_convert(conf, :buffer, default_chunk_key: "time") @@ -189,7 +191,9 @@ def configure(conf) end @renew_kerberos_delegation_token_interval_hour = nil + @kerberos_delegation_token_max_lifetime_hour = nil if @renew_kerberos_delegation_token + @kerberos_delegation_token_max_lifetime_hour = @kerberos_delegation_token_max_lifetime / 60 / 60 unless @username raise Fluent::ConfigError, "username is missing. If you want to reuse delegation token, follow with kerberos accounts" end @@ -215,7 +219,7 @@ def multi_workers_ready? end def prepare_client(host, port, username) - client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour) + client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour, @kerberos_delegation_token_max_lifetime_hour) if @httpfs client.httpfs_mode = true end @@ -542,4 +546,4 @@ def self.register_compressor(name, compressor) require 'fluent/plugin/webhdfs_compressor_snappy' require 'fluent/plugin/webhdfs_compressor_hadoop_snappy' require 'fluent/plugin/webhdfs_compressor_lzo_command' -require 'fluent/plugin/webhdfs_compressor_zstd' +require 'fluent/plugin/webhdfs_compressor_zstd' \ No newline at end of file diff --git a/test/plugin/test_out_webhdfs.rb b/test/plugin/test_out_webhdfs.rb index 54623ba..ef43915 100644 --- a/test/plugin/test_out_webhdfs.rb +++ b/test/plugin/test_out_webhdfs.rb @@ -328,7 +328,7 @@ def test_time_key_without_buffer_section }) test "renew_kerberos_delegation_token default" do - mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, nil).once + mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, nil, nil).once d = create_driver(CONFIG_KERBEROS) @@ -337,18 +337,20 @@ def test_time_key_without_buffer_section kerberos: true, renew_kerberos_delegation_token: false, renew_kerberos_delegation_token_interval_hour: nil, + kerberos_delegation_token_max_lifetime_hour: nil, }, { kerberos: d.instance.kerberos, renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end test "default renew_kerberos_delegation_token_interval" do expected_hour = 8 - - mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once + expected_delegation_token_max_lifetime_hour = 7 * 24 + mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour, expected_delegation_token_max_lifetime_hour).once d = create_driver(CONFIG_KERBEROS + config_element("", "", { "renew_kerberos_delegation_token" => true })) @@ -359,19 +361,24 @@ def test_time_key_without_buffer_section renew_kerberos_delegation_token: true, renew_kerberos_delegation_token_interval: expected_hour * 60 * 60, renew_kerberos_delegation_token_interval_hour: expected_hour, + kerberos_delegation_token_max_lifetime: expected_delegation_token_max_lifetime_hour * 60 * 60, + kerberos_delegation_token_max_lifetime_hour: expected_delegation_token_max_lifetime_hour, }, { kerberos: d.instance.kerberos, renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime"), + kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end test "renew_kerberos_delegation_token_interval" do expected_hour = 10 + expected_delegation_token_max_lifetime_hour = 24 - mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once + mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour,expected_delegation_token_max_lifetime_hour).once d = create_driver( CONFIG_KERBEROS + @@ -380,6 +387,7 @@ def test_time_key_without_buffer_section { "renew_kerberos_delegation_token" => true, "renew_kerberos_delegation_token_interval" => "#{expected_hour}h", + "kerberos_delegation_token_max_lifetime" => "#{expected_delegation_token_max_lifetime_hour}h" })) assert_equal( @@ -388,12 +396,16 @@ def test_time_key_without_buffer_section renew_kerberos_delegation_token: true, renew_kerberos_delegation_token_interval: expected_hour * 60 * 60, renew_kerberos_delegation_token_interval_hour: expected_hour, + kerberos_delegation_token_max_lifetime: expected_delegation_token_max_lifetime_hour * 60 * 60, + kerberos_delegation_token_max_lifetime_hour: expected_delegation_token_max_lifetime_hour }, { kerberos: d.instance.kerberos, renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime"), + kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end