From 849c7675ea70215e74000c37d011d71e926b0813 Mon Sep 17 00:00:00 2001 From: OhJuhun Date: Tue, 12 Mar 2024 10:42:20 +0900 Subject: [PATCH 1/4] impelement kerberos delegation token max-lifetime Signed-off-by: OhJuhun --- lib/fluent/plugin/out_webhdfs.rb | 10 +++++++--- test/plugin/test_out_webhdfs.rb | 16 +++++++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/out_webhdfs.rb b/lib/fluent/plugin/out_webhdfs.rb index 3f35f69..83c7e26 100644 --- a/lib/fluent/plugin/out_webhdfs.rb +++ b/lib/fluent/plugin/out_webhdfs.rb @@ -70,6 +70,8 @@ class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output config_param :renew_kerberos_delegation_token, :bool, default: false desc 'delegation token reuse timer (default 8h)' config_param :renew_kerberos_delegation_token_interval, :time, default: 8 * 60 * 60 + desc 'delegation token max-lifetime (default 7d)' + config_param :kerberos_delegation_token_max_lifetime, :time, default: 7 * 24 * 60 * 60 SUPPORTED_COMPRESS = [:gzip, :bzip2, :snappy, :hadoop_snappy, :lzo_command, :zstd, :text] desc "Compression method (#{SUPPORTED_COMPRESS.join(',')})" @@ -114,7 +116,7 @@ def configure(conf) else 86400 end if buffer_config = conf.elements(name: "buffer").first - timekey = buffer_config["timekey"] || timekey + timekey = buffer_config["timekey"] || timekey end compat_parameters_convert(conf, :buffer, default_chunk_key: "time") @@ -189,7 +191,9 @@ def configure(conf) end @renew_kerberos_delegation_token_interval_hour = nil + @kerberos_delegation_token_max_lifetime_hour = nil if @renew_kerberos_delegation_token + @kerberos_delegation_token_max_lifetime_hour = @kerberos_delegation_token_max_lifetime / 60 / 60 unless @username raise Fluent::ConfigError, "username is missing. If you want to reuse delegation token, follow with kerberos accounts" end @@ -215,7 +219,7 @@ def multi_workers_ready? end def prepare_client(host, port, username) - client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour) + client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour, @kerberos_delegation_token_max_lifetime_hour) if @httpfs client.httpfs_mode = true end @@ -542,4 +546,4 @@ def self.register_compressor(name, compressor) require 'fluent/plugin/webhdfs_compressor_snappy' require 'fluent/plugin/webhdfs_compressor_hadoop_snappy' require 'fluent/plugin/webhdfs_compressor_lzo_command' -require 'fluent/plugin/webhdfs_compressor_zstd' +require 'fluent/plugin/webhdfs_compressor_zstd' \ No newline at end of file diff --git a/test/plugin/test_out_webhdfs.rb b/test/plugin/test_out_webhdfs.rb index 54623ba..383684c 100644 --- a/test/plugin/test_out_webhdfs.rb +++ b/test/plugin/test_out_webhdfs.rb @@ -337,18 +337,20 @@ def test_time_key_without_buffer_section kerberos: true, renew_kerberos_delegation_token: false, renew_kerberos_delegation_token_interval_hour: nil, + kerberos_delegation_token_max_lifetime_hour: nil, }, { kerberos: d.instance.kerberos, renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end test "default renew_kerberos_delegation_token_interval" do expected_hour = 8 - - mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once + expected_delegation_token_max_lifetime_hour = 7 * 24 + mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour,nil).once d = create_driver(CONFIG_KERBEROS + config_element("", "", { "renew_kerberos_delegation_token" => true })) @@ -359,19 +361,23 @@ def test_time_key_without_buffer_section renew_kerberos_delegation_token: true, renew_kerberos_delegation_token_interval: expected_hour * 60 * 60, renew_kerberos_delegation_token_interval_hour: expected_hour, + kerberos_delegation_token_max_lifetime: expected_delegation_token_max_lifetime_hour * 60 * 60, + kerberos_delegation_token_max_lifetime_hour: expected_delegation_token_max_lifetime_hour, }, { kerberos: d.instance.kerberos, renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end test "renew_kerberos_delegation_token_interval" do expected_hour = 10 + expected_delegation_token_max_lifetime_hour = 24 - mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once + mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour,expected_delegation_token_max_lifetime_hour).once d = create_driver( CONFIG_KERBEROS + @@ -380,6 +386,7 @@ def test_time_key_without_buffer_section { "renew_kerberos_delegation_token" => true, "renew_kerberos_delegation_token_interval" => "#{expected_hour}h", + "kerberos_delegation_token_max_lifetime" => "#{expected_delegation_token_max_lifetime_hour}h" })) assert_equal( @@ -388,12 +395,15 @@ def test_time_key_without_buffer_section renew_kerberos_delegation_token: true, renew_kerberos_delegation_token_interval: expected_hour * 60 * 60, renew_kerberos_delegation_token_interval_hour: expected_hour, + kerberos_delegation_token_max_lifetime: expected_delegation_token_max_lifetime_hour * 60 * 60, + kerberos_delegation_token_max_lifetime_hour: expected_delegation_token_max_lifetime_hour }, { kerberos: d.instance.kerberos, renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end From bcd90c32b1af92f7d7be6c15ad3da225645ecab6 Mon Sep 17 00:00:00 2001 From: OhJuhun Date: Wed, 13 Mar 2024 09:47:31 +0900 Subject: [PATCH 2/4] update gemspc:webhdfs 0.10.0 to 0.11.0 Signed-off-by: OhJuhun --- fluent-plugin-webhdfs.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-plugin-webhdfs.gemspec b/fluent-plugin-webhdfs.gemspec index fd2c128..1d6b309 100644 --- a/fluent-plugin-webhdfs.gemspec +++ b/fluent-plugin-webhdfs.gemspec @@ -23,5 +23,5 @@ Gem::Specification.new do |gem| gem.add_development_dependency "bzip2-ffi" gem.add_development_dependency "zstandard" gem.add_runtime_dependency "fluentd", '>= 0.14.22' - gem.add_runtime_dependency "webhdfs", '>= 0.10.0' + gem.add_runtime_dependency "webhdfs", '>= 0.11.0' end From e8599ae052e1f25c31c30e3dd2fa4a60d6360bfa Mon Sep 17 00:00:00 2001 From: OhJuhun Date: Wed, 13 Mar 2024 11:33:25 +0900 Subject: [PATCH 3/4] fix testcode Signed-off-by: OhJuhun --- test/plugin/test_out_webhdfs.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/test/plugin/test_out_webhdfs.rb b/test/plugin/test_out_webhdfs.rb index 383684c..498d219 100644 --- a/test/plugin/test_out_webhdfs.rb +++ b/test/plugin/test_out_webhdfs.rb @@ -403,6 +403,7 @@ def test_time_key_without_buffer_section renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime"), kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end From 7c056fea9b6a47959e1a8df855d20f8a86fc7df3 Mon Sep 17 00:00:00 2001 From: OhJuhun Date: Wed, 13 Mar 2024 11:48:46 +0900 Subject: [PATCH 4/4] fix test code Signed-off-by: OhJuhun --- test/plugin/test_out_webhdfs.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/plugin/test_out_webhdfs.rb b/test/plugin/test_out_webhdfs.rb index 498d219..ef43915 100644 --- a/test/plugin/test_out_webhdfs.rb +++ b/test/plugin/test_out_webhdfs.rb @@ -328,7 +328,7 @@ def test_time_key_without_buffer_section }) test "renew_kerberos_delegation_token default" do - mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, nil).once + mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, nil, nil).once d = create_driver(CONFIG_KERBEROS) @@ -350,7 +350,7 @@ def test_time_key_without_buffer_section test "default renew_kerberos_delegation_token_interval" do expected_hour = 8 expected_delegation_token_max_lifetime_hour = 7 * 24 - mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour,nil).once + mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour, expected_delegation_token_max_lifetime_hour).once d = create_driver(CONFIG_KERBEROS + config_element("", "", { "renew_kerberos_delegation_token" => true })) @@ -369,6 +369,7 @@ def test_time_key_without_buffer_section renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"), renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"), renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"), + kerberos_delegation_token_max_lifetime: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime"), kerberos_delegation_token_max_lifetime_hour: d.instance.instance_eval("@kerberos_delegation_token_max_lifetime_hour"), }) end