diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 44761b4061..7e0289f65b 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -52,6 +52,7 @@ def initialize super @paths = [] @tails = {} + @tails_rotate_wait = {} @pf_file = nil @pf = nil @ignore_list = [] @@ -267,6 +268,9 @@ def shutdown @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) + @tails_rotate_wait.keys.each do |tw| + detach_watcher(tw, @tails_rotate_wait[tw][:ino], false) + end @pf_file.close if @pf_file super @@ -275,6 +279,7 @@ def shutdown def close super # close file handles after all threads stopped (in #close of thread plugin helper) + # It may be because we need to wait IOHanlder.ready_to_shutdown() close_watcher_handles end @@ -516,6 +521,9 @@ def close_watcher_handles tw.close end end + @tails_rotate_wait.keys.each do |tw| + tw.close + end end # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. @@ -570,10 +578,6 @@ def update_watcher(tail_watcher, pe, new_inode) detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode) end - # TailWatcher#close is called by another thread at shutdown phase. - # It causes 'can't modify string; temporarily locked' error in IOHandler - # so adding close_io argument to avoid this problem. - # At shutdown, IOHandler's io will be released automatically after detached the event loop def detach_watcher(tw, ino, close_io = true) if @follow_inodes && tw.ino != ino log.warn("detach_watcher could be detaching an unexpected tail_watcher with a different ino.", @@ -604,7 +608,11 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif throttling_is_enabled?(tw) + end + + return if @tails_rotate_wait[tw] + + if throttling_is_enabled?(tw) # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now @@ -612,14 +620,18 @@ def detach_watcher_after_rotate_wait(tw, ino) elapsed = Fluent::Clock.now - start_time_to_wait if tw.eof? && elapsed >= @rotate_wait timer.detach + @tails_rotate_wait.delete(tw) detach_watcher(tw, ino) end end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } else # when the throttling feature isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + @tails_rotate_wait.delete(tw) detach_watcher(tw, ino) end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 8919866683..58006c0b98 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3016,6 +3016,92 @@ def test_path_resurrection }, ) end + + def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + "rotate_wait" => "3s", + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls. + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 6, timeout: 15) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} + + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) + + FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} + + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) + + # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) + [1, 0].each do |i| + FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}") + end + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} + + # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate. + # (Note: Currently, there is no harm in duplicate calls) + sleep 4 + end + + inode_0 = tail_watchers[0]&.ino + inode_1 = tail_watchers[1]&.ino + inode_2 = tail_watchers[2]&.ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"], + tail_watcher_inodes: [inode_0, inode_1, inode_2], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_0], + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_1], + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end end sub_test_case "Update watchers for rotation without follow_inodes" do @@ -3084,9 +3170,6 @@ def test_refreshTW_during_rotation sleep 3 Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} - - # Wait `rotate_wait` for file2 to make sure to close all IO handlers - sleep 3 end inode_0 = tail_watchers[0]&.ino @@ -3121,5 +3204,85 @@ def test_refreshTW_during_rotation }, ) end + + def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt0", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "rotate_wait" => "3s", + "refresh_interval" => "1h", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls. + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 6, timeout: 15) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} + + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) + + FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} + + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) + + # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) + [1, 0].each do |i| + FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}") + end + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} + + # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate. + # (Note: Currently, there is no harm in duplicate calls) + sleep 4 + end + + inode_0 = tail_watchers[0]&.ino + inode_1 = tail_watchers[1]&.ino + inode_2 = tail_watchers[2]&.ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"], + tail_watcher_inodes: [inode_0, inode_1, inode_2], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end end end