Skip to content

Commit

Permalink
Merge pull request #1729 from AM-iain/issue1726
Browse files Browse the repository at this point in the history
Issue 1726: Fix for in_tcp log corruption under load.
  • Loading branch information
repeatedly authored Nov 3, 2017
2 parents 75fef8f + 875cbc3 commit 6745366
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
9 changes: 4 additions & 5 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ def multi_workers_ready?
def start
super

@buffer = ''
server_create(:in_tcp_server, @port, bind: @bind) do |data, conn|
@buffer << data
conn.buffer << data
begin
pos = 0
while i = @buffer.index(@delimiter, pos)
msg = @buffer[pos...i]
while i = conn.buffer.index(@delimiter, pos)
msg = conn.buffer[pos...i]
pos = i + @delimiter.length

@parser.parse(msg) do |time, record|
Expand All @@ -77,7 +76,7 @@ def start
router.emit(tag, time, record)
end
end
@buffer.slice!(0, pos) if pos > 0
conn.buffer.slice!(0, pos) if pos > 0
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,12 @@ def on(event, &callback)
end

class TCPCallbackSocket < CallbackSocket
attr_accessor :buffer

def initialize(sock)
super("tcp", sock, [:data, :write_complete, :close])
@peeraddr = @sock.peeraddr
@buffer = ''
end

def write(data)
Expand Down

0 comments on commit 6745366

Please sign in to comment.