Skip to content

Commit

Permalink
perf: Gather aggregate per-line and per-tenant metrics for Drain patt…
Browse files Browse the repository at this point in the history
…erns (#13368)
  • Loading branch information
benclive committed Jul 2, 2024
1 parent 23f2006 commit bf1d6e3
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 12 deletions.
4 changes: 4 additions & 0 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster
if len(tokens) < 4 {
return nil
}
if d.metrics != nil {
d.metrics.TokensPerLine.Observe(float64(len(tokens)))
d.metrics.StatePerLine.Observe(float64(len(state.([]int))))
}
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
line := scanner.Text()
lines = append(lines, line)
}
drain := New(DefaultConfig(), nil)

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
drain := New(DefaultConfig(), nil)
for _, line := range lines {
drain.Train(line, 0)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
drain *Drain
inputFile string
patterns []string
format string
}{
{
drain: New(DefaultConfig(), nil),
inputFile: `testdata/agent-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
`ts=2024-04-16T15:10:42.<_> level=info msg="finished node evaluation" controller_id=module.http.cloudwatch_pipelines node_id=prometheus.scrape.<_> duration=<_>.<_>`,
`ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", batch_kubernetes_io_job_name=\"testcoordinator-job-2665838\", container=\"testcoordinator\", controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", job=\"k6-cloud/testcoordinator\", job_name=\"testcoordinator-job-2665838\", name=\"testcoordinator\", namespace=\"k6-cloud\", pod=\"testcoordinator-job-2665838-9g8ds\"}"`,
Expand Down Expand Up @@ -62,6 +64,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: `testdata/ingester-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
`ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg="GET /debug/pprof/delta_mutex (200) 1.161082ms"`,
`ts=2024-04-17T09:52:46.<_> caller=head.go:216 level=debug tenant=987678 msg="profile is empty after delta computation" metricName=memory`,
Expand All @@ -71,6 +74,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: `testdata/drone-json.txt`,
format: FormatJSON,
patterns: []string{
`{"duration":<_>,"level":"debug","method":"GET","msg":"request completed","referer":"","remote":"10.136.105.40:52702","request":"/metrics","status":200,"time":"<_>:<_>:<_>","user-agent":"GrafanaAgent/v0.40.3 (flow; linux; helm)"}`,
`{"id":"<_>","level":"debug","max-pool":4,"min-pool":0,"msg":"check capacity","pending-builds":0,"running-builds":0,"server-buffer":0,"server-capacity":0,"server-count":0,"time":"<_>:<_>:<_>"}`,
Expand All @@ -83,6 +87,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/distributor-logfmt.txt",
format: FormatLogfmt,
patterns: []string{
`ts=2024-05-02T12:17:22.851228301Z caller=http.go:194 level=debug traceID=1e1fe5ba1756bc38 orgID=1819 msg="POST /pyroscope/ingest?aggregationType=sum&from=1714652230&name=flamegraph.com%7Bapp_kubernetes_io_instance%3Dflamegraph-com%2Capp_kubernetes_io_name%3Dflamegraph-com%2Ccluster%3Dflamegraph.com%2Cinstance%3D10.0.11.146%3A8001%2Cjob%3Dkubernetes-pods%2Cnamespace%3Dflamegraph-com%2Cpod%3Dflamegraph-com-backend-79c858c7bf-jw2hn%2Cpod_template_hash%3D79c858c7bf%2Cpyroscope_tenant%3Dpyroscope%2Ctier%3Dbackend%7D&sampleRate=0&spyName=scrape&units=samples&until=1714652240 (200) 22.345191ms"`,
`ts=2024-05-02T12:17:22.<_> caller=http.go:194 level=debug traceID=<_> orgID=75 msg="POST /ingest?aggregationType=&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=<_>&spyName=gospy&units=&until=1714652242232506798 (200) <_>.<_>"`,
Expand All @@ -94,6 +99,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/journald.txt",
format: FormatUnknown,
patterns: []string{
` ln --force -s /proc/$(pidof hgrun-pause)/root/bin/hgrun /bin/hgrun;`,
` while [ "$(pidof plugins-pause)" = "" ]; do sleep 0.5; done;`,
Expand Down Expand Up @@ -200,6 +206,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kafka.txt",
format: FormatUnknown,
patterns: []string{
`[2024-05-07 10:55:40,626] INFO [LocalLog partition=ingest-6, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=180391157, size=16991045, lastModifiedTime=1715075754780, largestRecordTimestamp=Some(1715075754774)),LogSegment(baseOffset=180393429, size=16997692, lastModifiedTime=1715075760206, largestRecordTimestamp=Some(1715075760186)),LogSegment(baseOffset=180395889, size=16998200, lastModifiedTime=1715075765542, largestRecordTimestamp=Some(1715075765526)),LogSegment(baseOffset=180398373, size=16977347, lastModifiedTime=1715075770515, largestRecordTimestamp=Some(1715075770504)) (kafka.log.LocalLog$)`,
`[2024-05-07 10:55:53,038] INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-1, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=447957, size=948, lastModifiedTime=1715059232052, largestRecordTimestamp=Some(1715059232002)),LogSegment(baseOffset=447969, size=948, lastModifiedTime=1715059424352, largestRecordTimestamp=Some(1715059424301)) (kafka.log.LocalLog$)`,
Expand All @@ -220,6 +227,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kubernetes.txt",
format: FormatUnknown,
patterns: []string{
`I0507 12:02:27.947830 1 nodeutilization.go:274] "Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers"`,
`I0507 12:02:27.<_> 1 defaultevictor.go:163] "pod does not fit on any other node because of nodeSelector(s), Taint(s), or nodes marked as unschedulable" pod="<_>/<_>"`,
Expand Down Expand Up @@ -269,6 +277,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/vault.txt",
format: FormatUnknown,
patterns: []string{
`2024-05-07T10:56:38.667Z [INFO] expiration: revoked lease: lease_id=auth/gcp/login/h4c031a99aa555040a0dd99864d828e946c6d4e31f4f5178757183def61f9d104`,
`2024-05-07T10:<_>:<_>.<_> [INFO] expiration: revoked lease: lease_id=auth/kubernetes/<_>/login/<_>`,
Expand All @@ -277,6 +286,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/calico.txt",
format: FormatUnknown,
patterns: []string{
`2024-05-08 15:23:56.403 [DEBUG][615489] felix/table.go 699: Finished loading iptables state ipVersion=0x4 table="filter"`,
`2024-05-08 15:23:56.403 [INFO][615489] felix/summary.go 100: Summarising 1 dataplane reconciliation loops over 600ms: avg=119ms longest=119ms (resync-filter-v4)`,
Expand Down Expand Up @@ -358,6 +368,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
{
drain: New(DefaultConfig(), nil),
inputFile: "testdata/grafana-ruler.txt",
format: FormatLogfmt,
patterns: []string{
`level=debug ts=2024-05-29T13:44:15.804597912Z caller=remote_instance_store.go:51 user=297794 slug=leanix msg="calling SaveAlertInstance"`,
`level=debug ts=2024-05-29T13:44:15.<_> caller=remote_instance_store.go:51 user=396586 slug=opengov msg="calling SaveAlertInstance"`,
Expand Down Expand Up @@ -412,10 +423,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
require.NoError(t, err)
defer file.Close()

detectedFormat := false
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
tt.drain.Train(line, 0)
if !detectedFormat {
require.Equal(t, tt.format, DetectLogFormat(line))
detectedFormat = true
}
}

var output []string
Expand Down Expand Up @@ -565,7 +581,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
for _, line := range tt.inputLines {
passes := matcher.Test([]byte(line))
require.Truef(t, passes, "Line should match extracted pattern: \nPatt[%q] \nLine[%q]", cluster.String(), line)

}
})
}
Expand Down
29 changes: 28 additions & 1 deletion pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,35 @@
package drain

import "github.com/prometheus/client_golang/prometheus"
import (
"regexp"

"github.com/prometheus/client_golang/prometheus"
)

const (
FormatLogfmt = "logfmt"
FormatJSON = "json"
FormatUnknown = "unknown"
)

var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$")

// DetectLogFormat guesses at how the logs are encoded based on some simple heuristics.
// It only runs on the first log line when a new stream is created, so it could do some more complex parsing or regex.
func DetectLogFormat(line string) string {
if len(line) < 2 {
return FormatUnknown
} else if line[0] == '{' && line[len(line)-1] == '}' {
return FormatJSON
} else if logfmtRegex.MatchString(line) {
return FormatLogfmt
}
return FormatUnknown
}

type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
TokensPerLine prometheus.Observer
StatePerLine prometheus.Observer
}
5 changes: 4 additions & 1 deletion pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/chunk"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/metric"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
Expand Down Expand Up @@ -208,7 +209,9 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger)
firstEntryLine := pushReqStream.Entries[0].Line
s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID)

if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
28 changes: 22 additions & 6 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
patternsDiscardedTotal prometheus.Counter
patternsDetectedTotal prometheus.Counter
patternsDiscardedTotal *prometheus.CounterVec
patternsDetectedTotal *prometheus.CounterVec
tokensPerLine *prometheus.HistogramVec
statePerLine *prometheus.HistogramVec
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
Expand All @@ -19,18 +21,32 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "flush_queue_length",
Help: "The total number of series pending in the flush queue.",
}),
patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
patternsDiscardedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_evicted_total",
Help: "The total number of patterns evicted from the LRU cache.",
}),
patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
}, []string{"tenant", "format"}),
patternsDetectedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}),
}, []string{"tenant", "format"}),
tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "tokens_per_line",
Help: "The number of tokens an incoming logline is split into for pattern recognition.",
Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280},
}, []string{"tenant", "format"}),
statePerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "state_per_line",
Help: "The number of items of additional state returned alongside tokens for pattern recognition.",
Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280},
}, []string{"tenant", "format"}),
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,19 @@ func newStream(
chunkMetrics *metric.ChunkMetrics,
cfg metric.AggregationConfig,
logger log.Logger,
guessedFormat string,
instanceID string,
) (*stream, error) {
stream := &stream{
fp: fp,
labels: labels,
labelsString: labels.String(),
labelHash: labels.Hash(),
patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{
PatternsEvictedTotal: metrics.patternsDiscardedTotal,
PatternsDetectedTotal: metrics.patternsDetectedTotal,
PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat),
PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat),
TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat),
StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat),
}),
cfg: cfg,
logger: logger,
Expand Down
11 changes: 11 additions & 0 deletions pkg/pattern/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/pattern/metric"

Expand All @@ -28,6 +29,8 @@ func TestAddStream(t *testing.T) {
Enabled: false,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -65,6 +68,8 @@ func TestPruneStream(t *testing.T) {
Enabled: false,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -113,6 +118,8 @@ func TestSampleIterator(t *testing.T) {
Enabled: true,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -158,6 +165,8 @@ func TestSampleIterator(t *testing.T) {
Enabled: true,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down Expand Up @@ -244,6 +253,8 @@ func TestSampleIterator(t *testing.T) {
Enabled: true,
},
log.NewNopLogger(),
drain.FormatUnknown,
"123",
)
require.NoError(t, err)

Expand Down

0 comments on commit bf1d6e3

Please sign in to comment.