-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(kafka): Add Ingestion from Kafka in Ingesters #14192
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
continue | ||
} | ||
|
||
if err := r.Commit(ctx, currOffset); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edit2: my train of thought here. tl;dr: no action needed.
if we're moving state to the ingester and persisting via the wal, I think we need to commit after each batch is pulled & accepted, not on a timer. Otherwise, restarting will replay the wal and then process the offsets that's in the wal already, resulting in duplicate lines in storage (if they're accepted to diff chunks, otherwise cut()
will dedupe them).
Edit1: looks like you handle this in partitionCommitter.Stop()
, which reduces the likelihood a problem here. There's still some gap where duplicates can occur (process dying before it can call the stop() method), but
a) this would be the case between a wal write and subsequent offset commit()
b) running on a timer like this should create fewer offset records in the offsets topic compared to updating them after each write.
All in all, I think this is fine 👍 , considering this problem is going to exist as long as we try to move state from one wal (kafka) to another (ingester) and then update the former. Since we can't make that atomic, this does a good enough job minimizing the blast radius.
What this PR does / why we need it:
This adds a new flag to allow starting ingesting from Kafka. It's heavily inspired by Mimir Kafka ingestion.
This is meant to be used as a new set of replica of ingesters.
The idea is simple, we keep the same ingestion from Ingester but allow to ingest from Kafka.
Ingesters now shares partition ownership through the partition ring.
A new downscale partition endpoint is added for downscaling and keeping partition alive until the ingester query window (2h) is passed. The new endpoint is used by the new rollout operator.
Which issue(s) this PR fixes:
Fixes https://github.com/grafana/loki-private/issues/1115
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
production/helm/loki/Chart.yaml
and updateproduction/helm/loki/CHANGELOG.md
andproduction/helm/loki/README.md
. Example PRdeprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR