Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki Logstash Plugin #1822

Merged
merged 30 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3159e46
Logstash plugin
adityacs Mar 26, 2020
0994c44
include_labels
adityacs Apr 10, 2020
727bdcb
include_labels
adityacs Apr 10, 2020
88587c1
Removes binary.
cyriltovena Apr 17, 2020
9c9b072
Improve documentation and remove the push path.
cyriltovena Apr 17, 2020
d45c9db
Move to cmd.
cyriltovena Jul 13, 2020
a21605a
Add more precision for jruby.
cyriltovena Jul 13, 2020
87895e9
Update docs/clients/logstash/README.md
cyriltovena Jul 13, 2020
913dbbb
p
cyriltovena Jul 13, 2020
9b9a250
ignore
cyriltovena Jul 13, 2020
1a4de4a
remove ignore file/
cyriltovena Jul 13, 2020
659ca6b
More precision for installing jruby
cyriltovena Jul 13, 2020
c2b33e3
Rename without Grafana
cyriltovena Jul 13, 2020
cbef9a4
A lot of refactoring and testing.
cyriltovena Jul 14, 2020
d10f042
change delay logic
adityacs Jul 15, 2020
26875cb
Fully tested version.
cyriltovena Jul 15, 2020
5a1af98
Merge branch 'loki_logstash_plugin' of github.com:adityacs/loki into …
cyriltovena Jul 15, 2020
1366ef8
Forgot to save merge.
cyriltovena Jul 15, 2020
99f504d
working version.
cyriltovena Jul 15, 2020
4776178
Makefile + easier docker build.
cyriltovena Jul 16, 2020
425d375
adds ci to build logstash image.
cyriltovena Jul 16, 2020
0d2acee
Fix build for logstash.
cyriltovena Jul 16, 2020
37508d1
Adds example with helm charts.
cyriltovena Jul 16, 2020
a8cde47
Fix target to send 10 logs with logstash.
cyriltovena Jul 16, 2020
0a266ec
Improved documentation.
cyriltovena Jul 16, 2020
4ae8cc7
add missing helm add repo for external repo
cyriltovena Jul 16, 2020
c87581a
Review comment.
cyriltovena Jul 16, 2020
3e6a5cf
Fixes loki service in Promtail.
cyriltovena Jul 17, 2020
29ee215
Merge remote-tracking branch 'upstream/master' into loki_logstash_plugin
cyriltovena Jul 17, 2020
e9fb63a
Update loki-stack version
cyriltovena Jul 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/logstash/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*.gem
.ruby-version
.bundle
logstash
path
!lib
7 changes: 7 additions & 0 deletions cmd/logstash/.rakeTasks
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<Settings><!--This file was automatically generated by Ruby plugin.
You are allowed to:
1. Remove rake task
2. Add existing rake tasks
To add existing rake tasks automatically delete this file and reload the project.
--><RakeGroup description="" fullCmd="" taksId="rake" /></Settings>
26 changes: 26 additions & 0 deletions cmd/logstash/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM logstash:7.6.2

ARG PLUGIN_VERSION=1.0.0
USER logstash

COPY --chown=logstash:logstash . /home/logstash/
WORKDIR /home/logstash/

ENV PATH /usr/share/logstash/vendor/jruby/bin:/usr/share/logstash/vendor/bundle/jruby/2.5.0/bin:$PATH
ENV LOGSTASH_SOURCE="1"
ENV LOGSTASH_PATH="/usr/share/logstash"
ENV GEM_PATH /usr/share/logstash/vendor/bundle/jruby/2.5.0
ENV GEM_HOME /usr/share/logstash/vendor/bundle/jruby/2.5.0

RUN gem install bundler:2.1.4

RUN bundle install --path=/usr/share/logstash/vendor/bundle && \
bundle exec rake vendor && \
bundle exec rspec

RUN gem build logstash-output-loki.gemspec && \
/usr/share/logstash/bin/logstash-plugin install logstash-output-loki-${PLUGIN_VERSION}.gem

EXPOSE 5044

CMD ["/usr/share/logstash/bin/logstash", "-f", "loki.conf"]
14 changes: 14 additions & 0 deletions cmd/logstash/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
source 'https://rubygems.org'

gemspec

# logstash_path = ENV["LOGSTASH_PATH"] || "./logstash"
# use_logstash_source = ENV["LOGSTASH_SOURCE"] && ENV["LOGSTASH_SOURCE"].to_s == "1"
#
# if Dir.exist?(logstash_path) && use_logstash_source
#
# end


gem 'logstash-core', :path => "./logstash/logstash-core"
gem 'logstash-core-plugin-api', :path => "./logstash/logstash-core-plugin-api"
75 changes: 75 additions & 0 deletions cmd/logstash/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Loki Logstash Output Plugin

Logstash plugin to send logstash aggregated logs to Loki.

## Install dependencies

First you need to setup JRuby environment to build this plugin. Refer https://github.com/rbenv/rbenv for setting up your rbenv environment.

After setting up `rbenv`. Install JRuby

```bash
rbenv install jruby-9.2.10.0
rbenv local jruby-9.2.10.0
```

Check that the environment is configured

```bash
ruby --version
jruby 9.2.10
```

You should use make sure you are running jruby and not ruby. If the command below still shows ruby and not jruby, check that PATH contains `$HOME/.rbenv/shims` and `$HOME/.rbenv/bin`. Also verify that you have this in your bash profile:

```bash
export PATH="$HOME/.rbenv/bin:$PATH"
eval "$(rbenv init -)"
```

Then install bundler
`gem install bundler:2.1.4`

Follow those instructions to [install logstash](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html) before moving to the next section.

## Install dependencies and Build plugin

### Install required packages

```bash
git clone git@github.com:elastic/logstash.git
cd logstash
git checkout tags/v7.6.2
export LOGSTASH_PATH=`pwd`
Comment on lines +40 to +43
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

export LOGSTASH_SOURCE="1"
export GEM_PATH=$LOGSTASH_PATH/vendor/bundle/jruby/2.5.0
export GEM_HOME=$LOGSTASH_PATH/vendor/bundle/jruby/2.5.0
./gradlew assemble
cd ..
ruby -S bundle install --path=$LOGSTASH_PATH/vendor/bundle/
ruby -S bundle exec rake vendor
```

### Build the plugin

`gem build logstash-output-loki.gemspec`

### Test

`ruby -S bundle exec rspec`

Alternatively if you don't want to install JRuby. Enter inside logstash-loki container.

```bash
docker build -t logstash-loki ./
docker run -v `pwd`/spec:/home/logstash/spec -it --rm --entrypoint /bin/sh logstash-loki
bundle exec rspec
```

## Install plugin to local logstash

`bin/logstash-plugin install --no-verify --local logstash-output-loki-1.0.0.gem`

## Send sample event and check plugin is working

`bin/logstash -f loki.conf`
1 change: 1 addition & 0 deletions cmd/logstash/Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
require "logstash/devutils/rake"
233 changes: 233 additions & 0 deletions cmd/logstash/lib/logstash/outputs/loki.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require 'net/http'
require 'concurrent-edge'
require 'time'
require 'uri'
require 'json'

class LogStash::Outputs::Loki < LogStash::Outputs::Base
require 'logstash/outputs/loki/batch'
require 'logstash/outputs/loki/entry'

config_name "loki"

## 'A single instance of the Output will be shared among the pipeline worker threads'
concurrency :single

## 'Loki URL'
config :url, :validate => :string, :required => true

## 'BasicAuth credentials'
config :username, :validate => :string, :required => false
config :password, :validate => :string, secret: true, :required => false

## 'Client certificate'
config :cert, :validate => :path, :required => false
config :key, :validate => :path, :required => false

## 'TLS'
config :ca_cert, :validate => :path, :required => false

## 'Loki Tenant ID'
config :tenant_id, :validate => :string, :required => false

## 'Maximum batch size to accrue before pushing to loki. Defaults to 102400 bytes'
config :batch_size, :validate => :number, :default => 102400, :required => false

## 'Interval in seconds to wait before pushing a batch of records to loki. Defaults to 1 second'
config :batch_wait, :validate => :number, :default => 1, :required => false

## 'Log line field to pick from logstash. Defaults to "message"'
config :message_field, :validate => :string, :default => "message", :required => false

## 'Backoff configuration. Initial backoff time between retries. Default 1s'
config :min_delay, :validate => :number, :default => 1, :required => false

## 'Backoff configuration. Maximum backoff time between retries. Default 300s'
config :max_delay, :validate => :number, :default => 300, :required => false

## 'Backoff configuration. Maximum number of retries to do'
config :retries, :validate => :number, :default => 10, :required => false

public
def register
@uri = URI.parse(@url)
unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
raise LogStash::ConfigurationError, "url parameter must be valid HTTP, currently '#{@url}'"
end

if @min_delay > @max_delay
raise LogStash::ConfigurationError, "Min delay should be less than Max delay, currently 'Min delay is #{@min_delay} and Max delay is #{@max_delay}'"
end

@logger.info("Loki output plugin", :class => self.class.name)

# initialize channels
@Channel = Concurrent::Channel
@entries = @Channel.new

# create nil batch object.
@batch = nil

# validate certs
if ssl_cert?
load_ssl
validate_ssl_key
end

@Channel.go{run()}
end

def ssl_cert?
!@key.nil? && !@cert.nil?
end

def load_ssl
@cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
@key = OpenSSL::PKey.read(File.read(@key)) if @key
end

def validate_ssl_key
if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
raise LogStash::ConfigurationError, "Unsupported private key type '#{@key.class}''"
end
end

def ssl_opts(uri)
opts = {
use_ssl: uri.scheme == 'https'
}

if !@cert.nil? && !@key.nil?
opts = opts.merge(
verify_mode: OpenSSL::SSL::VERIFY_PEER,
cert: @cert,
key: @key
)
end

unless @ca_cert.nil?
opts = opts.merge(
ca_file: @ca_cert
)
end
opts
end

def run()
min_wait_checkfrequency = 1/100 #1 millisecond
max_wait_checkfrequency = @batch_wait / 10
if max_wait_checkfrequency < min_wait_checkfrequency
max_wait_checkfrequency = min_wait_checkfrequency
end

@max_wait_check = Concurrent::Channel.tick(max_wait_checkfrequency)
loop do
Concurrent::Channel.select do |s|
s.take(@entries) { |e|
if add_entry_to_batch(e)
@logger.debug("Max batch_size is reached. Sending batch to loki")
send(@tenant_id, @batch)
@batch = Batch.new(e)
end
}
s.take(@max_wait_check) {
# Send batch if max wait time has been reached
if is_batch_expired
@logger.debug("Max batch_wait time is reached. Sending batch to loki")
send(@tenant_id, @batch)
@batch = nil
end
}
end
end
end

def add_entry_to_batch(e)
line = e.entry['line']
# we don't want to send empty lines.
return false if line.to_s.strip.empty?

if @batch.nil?
@batch = Batch.new(e)
return false
end

if @batch.size_bytes_after(line) > @batch_size
return true
end
@batch.add(e)
return false
end

def is_batch_expired
return !@batch.nil? && @batch.age() >= @batch_wait
end

## Receives logstash events
public
def receive(event)
@entries << Entry.new(event,@message_field)
end

def close
@logger.info("Closing loki output plugin. Flushing all pending batches")
@entries.close
send(@tenant_id, @batch) if !@batch.nil?
@max_wait_check.close if !@max_wait_check.nil?
end

def send(tenant_id, batch)
res = loki_http_request(tenant_id, batch.to_json, @min_delay, @max_delay, @retries)

if res.is_a?(Net::HTTPSuccess)
@logger.debug("Successfully pushed data to loki")
return
else
@logger.error("failed to write post to ", :uri => @uri, :code => res.code, :body => res.body, :message => res.message) if !res.nil?
@logger.debug("Payload object ", :payload => payload)
end
end

def loki_http_request(tenant_id, payload, min_delay, max_delay, retries)
req = Net::HTTP::Post.new(
@uri.request_uri
)
req.add_field('Content-Type', 'application/json')
req.add_field('X-Scope-OrgID', tenant_id) if tenant_id
req.basic_auth(@username, @password) if @username
req.body = payload

opts = ssl_opts(@uri)

@logger.debug("sending #{req.body.length} bytes to loki")
retry_count = 0
delay = min_delay
begin
res = Net::HTTP.start(@uri.host, @uri.port, **opts) { |http| http.request(req) }
rescue Net::HTTPTooManyRequests, Net::HTTPServerError, Errno::ECONNREFUSED => e
unless retry_count < retries
@logger.error("Error while sending data to loki. Tried #{retry_count} times\n. :error => #{e}")
return res
end

retry_count += 1
@logger.warn("Trying to send again. Attempt number: #{retry_count}. Retrying in #{delay}s")
sleep delay

if (delay * 2 - delay) > max_delay
delay = delay
else
delay = delay * 2
end

retry
rescue StandardError => e
@logger.error("Error while connecting to loki server ", :error_inspect => e.inspect, :error => e)
return res
end
return res
end
end
Loading