Skip to content

Commit

Permalink
feat: Limit to block ingestion until configured date (#13958)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Aug 27, 2024
1 parent 5cedb19 commit b5ac6a0
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 0 deletions.
11 changes: 11 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4057,6 +4057,17 @@ otlp_config:
# Configuration for log attributes to store them as Structured Metadata or
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Block ingestion until the configured date. The time should be in RFC3339
# format.
# CLI flag: -limits.block-ingestion-until
[block_ingestion_until: <time> | default = 0]

# HTTP status code to return when ingestion is blocked. If 200, the ingestion
# will be blocked without returning an error to the client. By Default, a custom
# status code (260) is returned to the client along with an error message.
# CLI flag: -limits.block-ingestion-status-code
[block_ingestion_status_code: <int> | default = 260]
```
### local_storage_config
Expand Down
17 changes: 17 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,23 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

now := time.Now()

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
validation.DiscardedSamples.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.BlockedIngestion, tenantID).Add(float64(validatedLineSize))

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if retStatusCode == http.StatusOK {
return &logproto.PushResponse{}, nil
}

return nil, httpgrpc.Errorf(retStatusCode, err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
Expand Down
54 changes: 54 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,60 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_PushIngestionBlocked(t *testing.T) {
for _, tc := range []struct {
name string
blockUntil time.Time
blockStatusCode int
expectError bool
expectedStatusCode int
}{
{
name: "not configured",
expectedStatusCode: http.StatusOK,
},
{
name: "not blocked",
blockUntil: time.Now().Add(-1 * time.Hour),
expectedStatusCode: http.StatusOK,
},
{
name: "blocked",
blockUntil: time.Now().Add(1 * time.Hour),
blockStatusCode: 456,
expectError: true,
expectedStatusCode: 456,
},
{
name: "blocked with status code 200",
blockUntil: time.Now().Add(1 * time.Hour),
blockStatusCode: http.StatusOK,
expectError: false,
expectedStatusCode: http.StatusOK,
},
} {
t.Run(tc.name, func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.BlockIngestionUntil = flagext.Time(tc.blockUntil)
limits.BlockIngestionStatusCode = tc.blockStatusCode

distributors, _ := prepare(t, 1, 5, limits, nil)
request := makeWriteRequest(1, 1024)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
require.ErrorContains(t, err, expectedErr)
require.Nil(t, response)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation.Limits, factory func(addr string) (ring_client.PoolClient, error)) ([]*Distributor, []mockIngester) {
t.Helper()

Expand Down
3 changes: 3 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ type Limits interface {
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
}
14 changes: 14 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type validationContext struct {
maxStructuredMetadataSize int
maxStructuredMetadataCount int

blockIngestionUntil time.Time
blockIngestionStatusCode int

userID string
}

Expand All @@ -70,6 +73,8 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
blockIngestionUntil: v.BlockIngestionUntil(userID),
blockIngestionStatusCode: v.BlockIngestionStatusCode(userID),
}
}

Expand Down Expand Up @@ -192,6 +197,15 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
return nil
}

// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code.
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int) {
if ctx.blockIngestionUntil.IsZero() {
return false, time.Time{}, 0
}

return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode
}

func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
Expand Down
16 changes: 16 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
defaultMaxStructuredMetadataCount = 128
defaultBloomCompactorMaxBlockSize = "200MB"
defaultBloomCompactorMaxBloomSize = "128MB"

defaultBlockedIngestionStatusCode = 260 // 260 is a custom status code to indicate blocked ingestion
)

// Limits describe all the limits for users; can be used to describe global default
Expand Down Expand Up @@ -222,6 +224,9 @@ type Limits struct {
MaxStructuredMetadataEntriesCount int `yaml:"max_structured_metadata_entries_count" json:"max_structured_metadata_entries_count" doc:"description=Maximum number of structured metadata entries per log line."`
OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"`
GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"`

BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
}

type StreamRetention struct {
Expand Down Expand Up @@ -411,6 +416,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.MaxStructuredMetadataSize, "limits.max-structured-metadata-size", "Maximum size accepted for structured metadata per entry. Default: 64 kb. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.")
f.IntVar(&l.MaxStructuredMetadataEntriesCount, "limits.max-structured-metadata-entries-count", defaultMaxStructuredMetadataCount, "Maximum number of structured metadata entries per log line. Default: 128. Any log line exceeding this limit will be discarded. There is no limit when unset or set to 0.")
f.BoolVar(&l.VolumeEnabled, "limits.volume-enabled", true, "Enable log volume endpoint.")

f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.")
f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.")
}

// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
Expand Down Expand Up @@ -1051,6 +1059,14 @@ func (o *Overrides) OTLPConfig(userID string) push.OTLPConfig {
return o.getOverridesForUser(userID).OTLPConfig
}

func (o *Overrides) BlockIngestionUntil(userID string) time.Time {
return time.Time(o.getOverridesForUser(userID).BlockIngestionUntil)
}

func (o *Overrides) BlockIngestionStatusCode(userID string) int {
return o.getOverridesForUser(userID).BlockIngestionStatusCode
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.TenantLimits(userID)
Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
StructuredMetadataTooLargeErrorMsg = "stream '%s' has structured metadata too large: '%d' bytes, limit: '%d' bytes. Please see `limits_config.max_structured_metadata_size` or contact your Loki administrator to increase it."
StructuredMetadataTooMany = "structured_metadata_too_many"
StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it."
BlockedIngestion = "blocked_ingestion"
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
)

type ErrStreamRateLimit struct {
Expand Down

0 comments on commit b5ac6a0

Please sign in to comment.