Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Backup: decouple log backup resolve locks from GCWorker. #45904

Merged
merged 56 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
c99737f
update checkpoint
3pointer Aug 7, 2023
6ac2d25
refactor GCWorker for lockResolver
3pointer Aug 8, 2023
418bd47
decouple logbackup in GCWorker
3pointer Aug 8, 2023
1bf06cd
use GCLockResolver in advancer
3pointer Aug 8, 2023
a0742dc
avoid cycle import
3pointer Aug 8, 2023
885425f
expose tikvStore for lockResolver
3pointer Aug 8, 2023
e0df995
update test
3pointer Aug 8, 2023
a209893
address comment
3pointer Aug 8, 2023
ae12e5c
fix bazel
3pointer Aug 9, 2023
5e4ba62
remove LocateKey/SendReq in GCLockResolver interface
3pointer Aug 9, 2023
713b16f
fix gc worker unit test
3pointer Aug 9, 2023
8d821a7
add ut for resolve lock
3pointer Aug 9, 2023
e76a325
fix
3pointer Aug 10, 2023
2adf397
precise resolve lock ranges
3pointer Aug 11, 2023
bf8b1d5
update log
3pointer Aug 14, 2023
1fe03aa
udpate go mod for client-go
3pointer Aug 16, 2023
7e02b09
update test
3pointer Aug 16, 2023
8dc9ce3
remove useless code
3pointer Aug 17, 2023
29e65d9
update test
3pointer Aug 17, 2023
ba1a4d1
fix test
3pointer Aug 17, 2023
c1628f7
update check time
3pointer Aug 21, 2023
99d8d10
resolve conflicts
3pointer Aug 21, 2023
e58f0fa
adapt new lockResolver
3pointer Aug 28, 2023
74b4459
Merge branch 'master' into decouple_pitr_for_gc
3pointer Aug 28, 2023
f22ece2
make bazel_prepare
3pointer Aug 28, 2023
3a0db58
update go mod
3pointer Aug 29, 2023
effa59b
Merge branch 'decouple_pitr_for_gc' of https://github.com/3pointer/ti…
3pointer Aug 29, 2023
9e6fcea
update
3pointer Aug 29, 2023
0e4c9d3
fix bazel build
3pointer Aug 29, 2023
b7c2912
fix race test
3pointer Aug 29, 2023
38c95e4
fix bazel
3pointer Aug 29, 2023
012bdec
fix unit test
3pointer Aug 29, 2023
3b9e7e6
Merge branch 'decouple_pitr_for_gc' of https://github.com/3pointer/ti…
3pointer Aug 29, 2023
d32ed0e
fix unit test
3pointer Aug 29, 2023
0d69d7b
fix unit test
3pointer Aug 30, 2023
22ddfc5
udpate
3pointer Aug 30, 2023
88e8092
fix test
3pointer Aug 30, 2023
75fb8fd
fix gc job test
3pointer Sep 1, 2023
e903cc0
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 1, 2023
3a60000
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 1, 2023
5d7c382
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 4, 2023
8d94a94
address comments
3pointer Sep 6, 2023
6f551b6
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 6, 2023
d92df68
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 6, 2023
9802295
address comment
3pointer Sep 6, 2023
ce3726f
address comment
3pointer Sep 6, 2023
e581709
address comment
3pointer Sep 6, 2023
d69e652
address comment
3pointer Sep 7, 2023
a545c59
address comment
3pointer Sep 7, 2023
c41accc
address comment
3pointer Sep 7, 2023
ba6c52a
fix ut
3pointer Sep 7, 2023
78ace34
fix unstable test
3pointer Sep 8, 2023
be0c535
address comment
3pointer Sep 11, 2023
b55796f
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 11, 2023
2ac41e7
Merge branch 'master' into decouple_pitr_for_gc
3pointer Sep 12, 2023
5ae1830
address comments
3pointer Sep 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.StoreManager.TLSConfig()
}

// GetStore gets the tikvStore.
func (mgr *Mgr) GetStore() tikv.Storage {
return mgr.tikvStore
}

// GetLockResolver gets the LockResolver.
func (mgr *Mgr) GetLockResolver() *txnlock.LockResolver {
return mgr.tikvStore.GetLockResolver()
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ go_library(
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
Expand All @@ -65,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 19,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand All @@ -82,18 +85,24 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_server_v3//embed",
"@io_etcd_go_etcd_server_v3//mvcc",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
192 changes: 163 additions & 29 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
package streamhelper

import (
"bytes"
"context"
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand All @@ -17,7 +21,10 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,7 +67,9 @@ type CheckpointAdvancer struct {

// the cached last checkpoint.
// if no progress, this cache can help us don't to send useless requests.
lastCheckpoint uint64
lastCheckpoint *checkpoint
lastCheckpointMu sync.Mutex
inResolvingLock atomic.Bool

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
Expand All @@ -69,6 +78,53 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
StartKey []byte
EndKey []byte
TS uint64

// It's better to use PD timestamp in future, for now
// use local time to decide the time to resolve lock is ok.
resolveLockTime time.Time
}

func newCheckpointWithTS(ts uint64) *checkpoint {
return &checkpoint{
TS: ts,
resolveLockTime: time.Now(),
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
TS: s.Value,
resolveLockTime: time.Now(),
}
}

func (c *checkpoint) safeTS() uint64 {
return c.TS - 1
}

func (c *checkpoint) equal(o *checkpoint) bool {
return bytes.Equal(c.StartKey, o.StartKey) &&
bytes.Equal(c.EndKey, o.EndKey) && c.TS == o.TS
}

// if a checkpoint stay in a time too long(3 min)
// we should try to resolve lock for the range
// to keep the RPO in 5 min.
func (c *checkpoint) needResolveLocks() bool {
failpoint.Inject("NeedResolveLocks", func(val failpoint.Value) {
failpoint.Return(val.(bool))
})
return time.Since(c.resolveLockTime) > 3*time.Minute
}

// NewCheckpointAdvancer creates a checkpoint advancer with the env.
func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
return &CheckpointAdvancer{
Expand All @@ -92,6 +148,13 @@ func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config)) {
c.UpdateConfig(cfg)
}

// UpdateLastCheckpoint modify the checkpoint in ticking.
func (c *CheckpointAdvancer) UpdateLastCheckpoint(p *checkpoint) {
c.lastCheckpointMu.Lock()
c.lastCheckpoint = p
c.lastCheckpointMu.Unlock()
}

// Config returns the current config.
func (c *CheckpointAdvancer) Config() config.Config {
return c.cfg
Expand Down Expand Up @@ -172,15 +235,24 @@ func tsoBefore(n time.Duration) uint64 {
return oracle.ComposeTS(now.UnixMilli()-n.Milliseconds(), 0)
}

func tsoAfter(ts uint64, n time.Duration) uint64 {
return oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(n))
}

func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()

f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
c.checkpoints = cps
}

func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
threshold time.Duration) (uint64, error) {
threshold time.Duration) (spans.Valued, error) {
var targets []spans.Valued
var minValue spans.Valued
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
Expand All @@ -194,13 +266,13 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context,
zap.Stringer("min", minValue), zap.Int("for-polling", len(targets)),
zap.String("min-ts", oracle.GetTimeFromTS(minValue.Value).Format(time.RFC3339)))
if len(targets) == 0 {
return minValue.Value, nil
return minValue, nil
}
err := c.tryAdvance(ctx, len(targets), func(i int) kv.KeyRange { return targets[i].Key })
if err != nil {
return 0, err
return minValue, err
}
return minValue.Value, nil
return minValue, nil
}

func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskEvent) error {
Expand Down Expand Up @@ -293,7 +365,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
if err != nil {
log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err))
Expand Down Expand Up @@ -323,33 +395,36 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
return false
}
if cp <= c.lastCheckpoint {
// Need resolve lock for different range and same TS
// so check the range and TS here.
if cp.equal(c.lastCheckpoint) {
return false
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
c.UpdateLastCheckpoint(cp)
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint.TS))
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context,
getCheckpoint func(context.Context) (uint64, error)) error {
getCheckpoint func(context.Context) (spans.Valued, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}

if c.setCheckpoint(cp) {
if c.setCheckpoint(ctx, cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)),
zap.Uint64("checkpoint", cp.Value),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
}
Expand Down Expand Up @@ -403,43 +478,59 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.setCheckpoint(ctx, c.checkpoints.Min())
3pointer marked this conversation as resolved.
Show resolved Hide resolved
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint-1)
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS())
if err != nil {
return errors.Annotatef(err,
"failed to update service GC safe point, current checkpoint is %d, target checkpoint is %d",
c.lastCheckpoint-1, p)
c.lastCheckpoint.safeTS(), p)
}
if p <= c.lastCheckpoint-1 {
if p <= c.lastCheckpoint.safeTS() {
log.Info("updated log backup GC safe point.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1))
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
}
if p > c.lastCheckpoint-1 {
if p > c.lastCheckpoint.safeTS() {
log.Warn("update log backup GC safe point failed: stale.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint-1))
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
}
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
// lastCheckpoint is not increased too long enough.
// assume the cluster has expired locks for whatever reasons.
var targets []spans.Valued
if c.lastCheckpoint != nil && c.lastCheckpoint.needResolveLocks() && c.inResolvingLock.CompareAndSwap(false, true) {
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
// when get locks here. assume these locks are not belong to same txn,
// but these locks' start ts are close to 1 minute. try resolve these locks at one time
vsf.TraverseValuesLessThan(tsoAfter(c.lastCheckpoint.TS, time.Minute), func(v spans.Valued) bool {
targets = append(targets, v)
return true
})
})
if len(targets) != 0 {
log.Info("Advancer starts to resolve locks", zap.Int("targets", len(targets)))
// use new context here to avoid timeout
ctx := context.Background()
c.asyncResolveLocksForRanges(ctx, targets)
}
c.inResolvingLock.Store(false)
}
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("Subscriber meet error, would polling the checkpoint.", zap.String("category", "log backup advancer"),
logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.advanceCheckpointBy(cx, func(cx context.Context) (spans.Valued, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
}
return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
Expand Down Expand Up @@ -468,3 +559,46 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {

return errs
}

func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, targets []spans.Valued) {
// run in another goroutine
// do not block main tick here
go func() {
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// we will scan all locks and try to resolve them by check txn status.
return tikv.ResolveLocksForRange(
ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
}
workerPool := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks")
var wg sync.WaitGroup
for _, r := range targets {
targetRange := r
wg.Add(1)
workerPool.Apply(func() {
defer wg.Done()
// Run resolve lock on the whole TiKV cluster.
// it will use startKey/endKey to scan region in PD.
// but regionCache already has a codecPDClient. so just use decode key here.
// and it almost only include one region here. so set concurrency to 1.
runner := rangetask.NewRangeTaskRunner("advancer-resolve-locks-runner",
c.env.GetStore(), 1, handler)
err := runner.RunOnRange(ctx, targetRange.Key.StartKey, targetRange.Key.EndKey)
if err != nil {
// wait for next tick
log.Warn("resolve locks failed, wait for next tick", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
zap.Error(err))
}
})
}
wg.Wait()
log.Info("finish resolve locks for checkpoint", zap.String("category", "advancer"),
zap.String("uuid", "log backup advancer"),
logutil.Key("StartKey", c.lastCheckpoint.StartKey),
logutil.Key("EndKey", c.lastCheckpoint.EndKey),
zap.Int("targets", len(targets)))
c.lastCheckpointMu.Lock()
c.lastCheckpoint.resolveLockTime = time.Now()
c.lastCheckpointMu.Unlock()
}()
}
Loading