diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f90331777377..0ce4982fb376 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1118,9 +1118,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams) - //TODO: detected field needs to contain the sketch - // make sure response to frontend is GRPC - //only want cardinality in JSON fields := make([]*logproto.DetectedField, len(detectedFields)) fieldCount := 0 for k, v := range detectedFields { @@ -1141,7 +1138,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. fieldCount++ } - //TODO: detected fields response needs to include the sketch return &logproto.DetectedFieldsResponse{ Fields: fields, FieldLimit: req.GetFieldLimit(), @@ -1218,7 +1214,6 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S fieldCount := uint32(0) for _, stream := range streams { - detectType := true level.Debug(spanlogger.FromContext(ctx)).Log( "detected_fields", "true", "msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries))) @@ -1241,6 +1236,7 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S df.parsers = append(df.parsers, *parser) } + detectType := true for _, v := range vals { parsedFields := detectedFields[k] if detectType { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 20c3b9f1b77c..07ff0d7cf1bb 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -9,6 +9,8 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/dskit/grpcclient" @@ -118,7 +120,6 @@ func (c *querierClientMock) GetDetectedLabels(ctx context.Context, in *logproto. return (*logproto.LabelToValuesResponse)(nil), args.Error(1) } return res.(*logproto.LabelToValuesResponse), args.Error(1) - } func (c *querierClientMock) GetVolume(ctx context.Context, in *logproto.VolumeRequest, opts ...grpc.CallOption) (*logproto.VolumeResponse, error) { @@ -517,6 +518,20 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator { return iter.NewStreamIterator(mockStream(from, quantity)) } +// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries, +// where entries timestamp and line string are constructed as sequential numbers +// starting at from, and the line is in logfmt format with the fields message, count and fake +func mockLogfmtStreamIterator(from int, quantity int) iter.EntryIterator { + return iter.NewStreamIterator(mockLogfmtStream(from, quantity)) +} + +// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries, +// where entries timestamp and line string are constructed as sequential numbers +// starting at from, and the line is in logfmt format with the fields message, count and fake +func mockLogfmtStreamIteratorWithStructuredMetadata(from int, quantity int) iter.EntryIterator { + return iter.NewStreamIterator(mockLogfmtStreamWithStructuredMetadata(from, quantity)) +} + // mockSampleIterator returns an iterator with 1 stream and quantity entries, // where entries timestamp and line string are constructed as sequential numbers // starting at from @@ -546,6 +561,71 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream } } +func mockLogfmtStream(from int, quantity int) logproto.Stream { + return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`) +} + +func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream { + entries := make([]logproto.Entry, 0, quantity) + + // used for detected fields queries which are always BACKWARD + for i := quantity; i > 0; i-- { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf( + `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`, + i, + i, + (i * 10), + (i * 256), + float32(i*10.0), + (i%2 == 0)), + }) + } + + return logproto.Stream{ + Entries: entries, + Labels: labels, + } +} + +func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Stream { + return mockLogfmtStreamWithLabelsAndStructuredMetadata(from, quantity, `{type="test"}`) +} + +func mockLogfmtStreamWithLabelsAndStructuredMetadata( + from int, + quantity int, + labels string, +) logproto.Stream { + var entries []logproto.Entry + metadata := push.LabelsAdapter{ + { + Name: "constant", + Value: "constant", + }, + } + + for i := from; i < from+quantity; i++ { + metadata = append(metadata, push.LabelAdapter{ + Name: "variable", + Value: fmt.Sprintf("value%d", i), + }) + } + + for i := quantity; i > 0; i-- { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i), + StructuredMetadata: metadata, + }) + } + return logproto.Stream{ + Labels: labels, + Entries: entries, + } +} + type querierMock struct { util.ExtendedMock } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index a787616efeee..450e2eb0d67f 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -324,9 +324,11 @@ func TestQuerier_SeriesAPI(t *testing.T) { {Key: "a", Value: "1"}, {Key: "b", Value: "2"}, }}, - {Labels: []logproto.SeriesIdentifier_LabelsEntry{ - {Key: "a", Value: "1"}, - {Key: "b", Value: "3"}}, + { + Labels: []logproto.SeriesIdentifier_LabelsEntry{ + {Key: "a", Value: "1"}, + {Key: "b", Value: "3"}, + }, }, {Labels: []logproto.SeriesIdentifier_LabelsEntry{ {Key: "a", Value: "1"}, @@ -994,7 +996,6 @@ func TestQuerier_RequestingIngesters(t *testing.T) { for _, tc := range tests { t.Run(tc.desc, func(t *testing.T) { - conf := mockQuerierConfig() conf.QueryIngestersWithin = time.Minute * 30 if tc.setIngesterQueryStoreMaxLookback { @@ -1175,7 +1176,6 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer mockReadRingWithOneActiveIngester(), &mockDeleteGettter{}, store, limits) - if err != nil { return nil, nil, nil, err } @@ -1191,6 +1191,7 @@ type fakeTimeLimits struct { func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration { return f.maxQueryLookback } + func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration { return f.maxQueryLength } @@ -1697,3 +1698,164 @@ func BenchmarkQuerierDetectedLabels(b *testing.B) { assert.NoError(b, err) } } + +func TestQuerier_DetectedFields(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "test") + + conf := mockQuerierConfig() + conf.IngesterQueryStoreMaxLookback = 0 + + request := logproto.DetectedFieldsRequest{ + Start: time.Now().Add(-1 * time.Minute), + End: time.Now(), + Query: `{type="test"}`, + LineLimit: 1000, + FieldLimit: 1000, + } + + t.Run("returns detected fields from queried logs", func(t *testing.T) { + store := newStoreMock() + store.On("SelectLogs", mock.Anything, mock.Anything). + Return(mockLogfmtStreamIterator(1, 5), nil) + + queryClient := newQueryClientMock() + queryClient.On("Recv"). + Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 5)}), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). + Return(queryClient, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + require.NoError(t, err) + + resp, err := querier.DetectedFields(ctx, &request) + require.NoError(t, err) + + detectedFields := resp.Fields + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 7) + expectedCardinality := map[string]uint64{ + "message": 5, + "count": 5, + "fake": 1, + "bytes": 5, + "duration": 5, + "percent": 5, + "even": 2, + } + for _, d := range detectedFields { + card := expectedCardinality[d.Label] + assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label) + } + }) + + t.Run("correctly identifies different field types", func(t *testing.T) { + store := newStoreMock() + store.On("SelectLogs", mock.Anything, mock.Anything). + Return(mockLogfmtStreamIterator(1, 2), nil) + + queryClient := newQueryClientMock() + queryClient.On("Recv"). + Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). + Return(queryClient, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + require.NoError(t, err) + + resp, err := querier.DetectedFields(ctx, &request) + require.NoError(t, err) + + detectedFields := resp.Fields + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 7) + + var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField + for _, field := range detectedFields { + switch field.Label { + case "message": + messageField = field + case "count": + countField = field + case "bytes": + bytesField = field + case "duration": + durationField = field + case "percent": + floatField = field + case "even": + evenField = field + } + } + + assert.Equal(t, logproto.DetectedFieldString, messageField.Type) + assert.Equal(t, logproto.DetectedFieldInt, countField.Type) + assert.Equal(t, logproto.DetectedFieldBytes, bytesField.Type) + assert.Equal(t, logproto.DetectedFieldDuration, durationField.Type) + assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type) + assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type) + }) +} + +func BenchmarkQuerierDetectedFields(b *testing.B) { + limits, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil) + ctx := user.InjectOrgID(context.Background(), "test") + + conf := mockQuerierConfig() + conf.IngesterQueryStoreMaxLookback = 0 + + request := logproto.DetectedFieldsRequest{ + Start: time.Now().Add(-1 * time.Minute), + End: time.Now(), + Query: `{type="test"}`, + LineLimit: 1000, + FieldLimit: 1000, + } + + store := newStoreMock() + store.On("SelectLogs", mock.Anything, mock.Anything). + Return(mockLogfmtStreamIterator(1, 2), nil) + + queryClient := newQueryClientMock() + queryClient.On("Recv"). + Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). + Return(queryClient, nil) + + querier, _ := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := querier.DetectedFields(ctx, &request) + assert.NoError(b, err) + } +}