Skip to content

Commit

Permalink
feat: Ignore empty streams in distributor if all entries fail validat…
Browse files Browse the repository at this point in the history
…ion (#13674)
  • Loading branch information
benclive committed Jul 26, 2024
1 parent 8a3ae22 commit 6c4b062
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize += len(entry.Line)
}
stream.Entries = stream.Entries[:n]
if len(stream.Entries) == 0 {
// Empty stream after validating all the entries
continue
}

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
if shardStreamsCfg.Enabled {
Expand Down
20 changes: 20 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,26 @@ func Test_TruncateLogLines(t *testing.T) {
})
}

func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {
setup := func() (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

limits.MaxLineSize = 5
return limits, &mockIngester{}
}

t.Run("it discards invalid entries and discards resulting empty streams completely", func(t *testing.T) {
limits, ingester := setup()
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\", service_name=\"unknown_service\"}", 10)))
topVal := ingester.Peek()
require.Nil(t, topVal)
})
}

func TestStreamShard(t *testing.T) {
// setup base stream.
baseStream := logproto.Stream{}
Expand Down

0 comments on commit 6c4b062

Please sign in to comment.