Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add UseAutoScaler config to disable AutoScaler #40966

Merged
merged 43 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
06dd82c
*: support fetch topo from AutoScaler
guo-shaoge Jan 12, 2023
c7b60f3
fix batch_coprocessor
guo-shaoge Jan 12, 2023
2b1badd
Merge branch 'master' of github.com:pingcap/tidb into autoscaler
guo-shaoge Jan 12, 2023
50439a7
fix rpcCtx.Meta nil
guo-shaoge Jan 13, 2023
ba260d0
tmp safe AWSTopoFetcher
guo-shaoge Jan 15, 2023
f3cdf84
Merge branch 'master' of github.com:pingcap/tidb into autoscaler
guo-shaoge Jan 15, 2023
95ad661
add AWSTopoFetcher
guo-shaoge Jan 17, 2023
bdb6b51
batch update same with(cse: 88bfe6e6ef4062b517e04d47bbc09751cc0bca0d)
guo-shaoge Jan 19, 2023
51916c9
fix review
guo-shaoge Jan 19, 2023
dac3d54
fix typo
guo-shaoge Jan 19, 2023
114af33
fix bazel_lint
guo-shaoge Jan 19, 2023
d280e17
refine topo fetcher err msg(cse:70feb371993e38ed92556ce602793b5a53e32…
guo-shaoge Jan 20, 2023
7c132a3
fix fmt
guo-shaoge Jan 20, 2023
61700de
remove using reflect
guo-shaoge Jan 28, 2023
1734140
update TestASType related code
guo-shaoge Jan 28, 2023
dbcad3a
update batch of trivial fix
guo-shaoge Jan 28, 2023
a092fdd
Merge branch 'master' of github.com:pingcap/tidb into autoscaler
guo-shaoge Jan 28, 2023
d6617ab
Merge branch 'master' of github.com:pingcap/tidb into autoscaler
guo-shaoge Jan 31, 2023
45fa4f7
Merge branch 'master' into autoscaler
bestwoody Jan 31, 2023
7da697d
*: use_autoscaler config for AutoSaler rollback
guo-shaoge Jan 31, 2023
338229b
change config name: cluster-name -> autoscaler-cluster-id
guo-shaoge Jan 31, 2023
00f6aa6
Merge branch 'autoscaler' of github.com:guo-shaoge/tidb into autoscaler
guo-shaoge Jan 31, 2023
dfaf69c
update config.toml.example
guo-shaoge Jan 31, 2023
fabf2ac
Merge branch 'master' of github.com:pingcap/tidb into autoscaler
guo-shaoge Feb 1, 2023
92fed8f
Merge branch 'master' into autoscaler
guo-shaoge Feb 1, 2023
ed0cf4c
fix unit-test
guo-shaoge Feb 1, 2023
856cabd
Merge branch 'master' into autoscaler
guo-shaoge Feb 1, 2023
582ba44
fix comment
guo-shaoge Feb 1, 2023
3fe60b5
Merge branch 'master' into autoscaler
ti-chi-bot Feb 1, 2023
e345bd9
Merge branch 'autoscaler' of github.com:guo-shaoge/tidb into autoscaler
guo-shaoge Feb 1, 2023
e93ff8a
Merge branch 'master' into autoscaler
guo-shaoge Feb 1, 2023
1396d1a
Merge branch 'master' into autoscaler
ti-chi-bot Feb 1, 2023
d55736a
Merge branch 'autoscaler' into switcher_autoscaler
guo-shaoge Feb 2, 2023
54d7b4d
Merge branch 'master' of github.com:pingcap/tidb into switcher_autosc…
guo-shaoge Feb 2, 2023
9fd5e1c
fix gotErr
guo-shaoge Feb 2, 2023
f3f540c
fix gotErr store
guo-shaoge Feb 2, 2023
7222a02
Merge branch 'master' of github.com:pingcap/tidb into switcher_autosc…
guo-shaoge Feb 2, 2023
419ed5e
fix case
guo-shaoge Feb 2, 2023
831aeee
Merge branch 'master' into switcher_autoscaler
guo-shaoge Feb 2, 2023
701ba55
trivial fix
guo-shaoge Feb 2, 2023
cd657f8
Merge branch 'switcher_autoscaler' of github.com:guo-shaoge/tidb into…
guo-shaoge Feb 2, 2023
d6ca02a
Merge branch 'master' into switcher_autoscaler
bestwoody Feb 2, 2023
b05c7d9
Merge branch 'master' into switcher_autoscaler
bestwoody Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ type Config struct {
TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"`
IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"`
AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"`
// todo: remove this after AutoScaler is stable.
UseAutoScaler bool `toml:"use-autoscaler" json:"use-autoscaler"`

// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
Expand Down Expand Up @@ -1012,6 +1014,7 @@ var defaultConf = Config{
TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr,
IsTiFlashComputeFixedPool: false,
AutoScalerClusterID: "",
UseAutoScaler: true,
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
TiDBEnableExitCheck: false,
Expand Down Expand Up @@ -1348,7 +1351,7 @@ func (c *Config) Valid() error {
}

// Check tiflash_compute topo fetch is valid.
if c.DisaggregatedTiFlash {
if c.DisaggregatedTiFlash && c.UseAutoScaler {
if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) {
return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s",
tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType)
Expand Down
4 changes: 4 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081
# Only meaningful when disaggregated-tiflash is true.
autoscaler-cluster-id = ""

# use-autoscaler indicates whether use AutoScaler or PD for tiflash_compute nodes, only meaningful when disaggregated-tiflash is true.
# Will remove this after AutoScaler is stable.
use-autoscaler = true

[log]
# Log level: debug, info, warn, error, fatal.
level = "info"
Expand Down
31 changes: 31 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,37 @@ func TestDisaggregatedTiFlash(t *testing.T) {
require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed")
}

// todo: remove this after AutoScaler is stable.
func TestDisaggregatedTiFlashNonAutoScaler(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
conf.UseAutoScaler = false
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
conf.UseAutoScaler = true
})

// Setting globalTopoFetcher to nil to can make sure cannot fetch topo from AutoScaler.
err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.InvalidASStr, "", "", false)
require.Contains(t, err.Error(), "unexpected topo fetch type. expect: mock or aws or gcp, got invalid")

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int)")
tk.MustExec("alter table t set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

err = tk.ExecToErr("select * from t;")
// This error message means we use PD instead of AutoScaler.
require.Contains(t, err.Error(), "tiflash_compute node is unavailable")
}

func TestDisaggregatedTiFlashQuery(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
Expand Down
8 changes: 8 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,6 +3339,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler {
// Invalid client-go tiflash_compute store cache if necessary.
err = dom.WatchTiFlashComputeNodeChange()
if err != nil {
return nil, err
}
}

if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil {
return nil, err
}
Expand Down
101 changes: 99 additions & 2 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@ func buildBatchCopTasksForNonPartitionedTable(
balanceWithContinuity bool,
balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
return buildBatchCopTasksConsistentHash(ctx, bo, store, []*KeyRanges{ranges}, storeType, ttl)
if config.GetGlobalConfig().UseAutoScaler {
return buildBatchCopTasksConsistentHash(ctx, bo, store, []*KeyRanges{ranges}, storeType, ttl)
}
return buildBatchCopTasksConsistentHashForPD(bo, store, []*KeyRanges{ranges}, storeType, ttl)
}
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand All @@ -511,7 +514,12 @@ func buildBatchCopTasksForPartitionedTable(
balanceContinuousRegionCount int64,
partitionIDs []int64) (batchTasks []*batchCopTask, err error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
batchTasks, err = buildBatchCopTasksConsistentHash(ctx, bo, store, rangesForEachPhysicalTable, storeType, ttl)
if config.GetGlobalConfig().UseAutoScaler {
batchTasks, err = buildBatchCopTasksConsistentHash(ctx, bo, store, rangesForEachPhysicalTable, storeType, ttl)
} else {
// todo: remove this after AutoScaler is stable.
batchTasks, err = buildBatchCopTasksConsistentHashForPD(bo, store, rangesForEachPhysicalTable, storeType, ttl)
}
} else {
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand Down Expand Up @@ -1169,3 +1177,92 @@ func (b *batchCopIterator) handleCollectExecutionInfo(bo *Backoffer, resp *batch
}
resp.detail.CalleeAddress = task.storeAddr
}

Copy link
Collaborator Author

@guo-shaoge guo-shaoge Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just copied buildBatchCopTasksConsistentHash from commit before this pr: #40729

// Only called when UseAutoScaler is false.
func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
kvStore *kvStore,
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
const cmdType = tikvrpc.CmdBatchCop
var retryNum int
cache := kvStore.GetRegionCache()

for {
retryNum++
var rangesLen int
tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)

for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, errors.Trace(err)
}
for _, lo := range locations {
tasks = append(tasks, &copTask{
region: lo.Location.Region,
ranges: lo.Ranges,
cmdType: cmdType,
storeType: storeType,
partitionIndex: int64(i),
})
regionIDs = append(regionIDs, lo.Location.Region)
}
}

stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
}
stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore)
if len(stores) == 0 {
return nil, errors.New("tiflash_compute node is unavailable")
}

rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores)
if err != nil {
return nil, err
}
if rpcCtxs == nil {
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
if len(rpcCtxs) != len(tasks) {
return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks))
}
taskMap := make(map[string]*batchCopTask)
for i, rpcCtx := range rpcCtxs {
regionInfo := RegionInfo{
// tasks and rpcCtxs are correspond to each other.
Region: tasks[i].region,
Meta: rpcCtx.Meta,
Ranges: tasks[i].ranges,
AllStores: []uint64{rpcCtx.Store.StoreID()},
PartitionIndex: tasks[i].partitionIndex,
}
if batchTask, ok := taskMap[rpcCtx.Addr]; ok {
batchTask.regionInfos = append(batchTask.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []RegionInfo{regionInfo},
}
taskMap[rpcCtx.Addr] = batchTask
res = append(res, batchTask)
}
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
break
}

failpointCheckForConsistentHash(res)
return res, nil
}
18 changes: 18 additions & 0 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
var rpcResp *tikvrpc.Response
var err error
var retry bool
invalidPDCache := config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler

// If copTasks is not empty, we should send request according to region distribution.
// Or else it's the task without region, which always happens in high layer task without table.
Expand All @@ -262,6 +263,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
// That's a hard job but we can try it in the future.
if sender.GetRPCError() != nil {
logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion))
if invalidPDCache {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
err = derr.ErrTiFlashServerTimeout
Expand All @@ -274,6 +278,9 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
retry = false
} else if err != nil {
if invalidPDCache {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil {
retry = true
}
Expand Down Expand Up @@ -351,18 +358,26 @@ func (m *mppIterator) cancelMppTasks() {
}

// send cancel cmd to all stores where tasks run
invalidPDCache := config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler
wg := util.WaitGroupWrapper{}
gotErr := atomic.Bool{}
for addr := range usedStoreAddrs {
storeAddr := addr
wg.Run(func() {
_, err := m.store.GetTiKVClient().SendRequest(context.Background(), storeAddr, wrappedReq, tikv.ReadTimeoutShort)
logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr), zap.Int64("mpp-version", m.mppVersion.ToInt64()))
if err != nil {
logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr), zap.Int64("mpp-version", m.mppVersion.ToInt64()))
if invalidPDCache {
gotErr.CompareAndSwap(false, true)
}
}
})
}
wg.Wait()
if invalidPDCache && gotErr.Load() {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
}

func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
Expand All @@ -389,6 +404,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques

if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion))
if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
m.sendError(derr.ErrTiFlashServerTimeout)
Expand Down
2 changes: 1 addition & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func main() {
err := cpuprofile.StartCPUProfiler()
terror.MustNil(err)

if config.GetGlobalConfig().DisaggregatedTiFlash {
if config.GetGlobalConfig().DisaggregatedTiFlash && config.GetGlobalConfig().UseAutoScaler {
clusterID, err := config.GetAutoScalerClusterID()
terror.MustNil(err)

Expand Down
3 changes: 3 additions & 0 deletions util/tiflashcompute/topo_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
GCPASStr = "gcp"
// TestASStr is string value for test AutoScaler.
TestASStr = "test"
// InvalidASStr is string value for invalid AutoScaler.
InvalidASStr = "invalid"
)

const (
Expand Down Expand Up @@ -127,6 +129,7 @@ func InitGlobalTopoFetcher(typ string, addr string, clusterID string, isFixedPoo
case TestASType:
globalTopoFetcher = NewTestAutoScalerFetcher()
default:
globalTopoFetcher = nil
err = errors.Errorf("unexpected topo fetch type. expect: %s or %s or %s, got %s",
MockASStr, AWSASStr, GCPASStr, typ)
}
Expand Down