From 06dd82ca7ae7dc1e999deb140004ed972b02ade9 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 12 Jan 2023 19:31:05 +0800 Subject: [PATCH 01/18] *: support fetch topo from AutoScaler Signed-off-by: guo-shaoge --- config/config.go | 26 ++- config/config.toml.example | 10 ++ planner/core/logical_plan_builder.go | 8 +- session/session.go | 14 +- store/copr/batch_coprocessor.go | 22 +-- store/copr/mpp.go | 1 + tidb-server/main.go | 7 + util/tiflashcompute/topo_fetcher.go | 233 +++++++++++++++++++++++++++ 8 files changed, 285 insertions(+), 36 deletions(-) create mode 100644 util/tiflashcompute/topo_fetcher.go diff --git a/config/config.go b/config/config.go index bc25b8c9b9ec3..bfe9017f56aa5 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,7 @@ import ( logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/tiflashcompute" "github.com/pingcap/tidb/util/tikvutil" "github.com/pingcap/tidb/util/versioninfo" tikvcfg "github.com/tikv/client-go/v2/config" @@ -277,12 +278,14 @@ type Config struct { OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"` // These items are deprecated because they are turned into instance system variables. - CheckMb4ValueInUTF8 AtomicBool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"` - EnableCollectExecutionInfo bool `toml:"enable-collect-execution-info" json:"enable-collect-execution-info"` - Plugin Plugin `toml:"plugin" json:"plugin"` - MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` - RunDDL bool `toml:"run-ddl" json:"run-ddl"` - DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"` + CheckMb4ValueInUTF8 AtomicBool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"` + EnableCollectExecutionInfo bool `toml:"enable-collect-execution-info" json:"enable-collect-execution-info"` + Plugin Plugin `toml:"plugin" json:"plugin"` + MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` + RunDDL bool `toml:"run-ddl" json:"run-ddl"` + DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"` + TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"` + TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num @@ -1307,6 +1310,17 @@ func (c *Config) Valid() error { return fmt.Errorf("stats-load-queue-size should be [%d, %d]", DefStatsLoadQueueSizeLimit, DefMaxOfStatsLoadQueueSizeLimit) } + // check tiflash_compute topo fetch is valid. + if c.DisaggregatedTiFlash { + if tiflashcompute.GetAutoScalerType(c.TiFlashComputeAutoScalerType) == tiflashcompute.InvalidASType { + return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s", + tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType) + } + if c.TiFlashComputeAutoScalerAddr == "" { + return fmt.Errorf("autoscaler-addr cannot be empty when disaggregated-tiflash mode is true") + } + } + // test log level l := zap.NewAtomicLevel() return l.UnmarshalText([]byte(c.Log.Level)) diff --git a/config/config.toml.example b/config/config.toml.example index 588379f204602..cd46a4ef446d9 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -114,6 +114,16 @@ enable-enum-length-limit = true # command can be forwarded to the right TiDB instance to execute. enable-global-kill = true +# disaggregated-tiflash indicates whether TiDB is in disaggregated tiflash mode, if true, MPP will runs on tiflash_compute nodes. +disaggregated-tiflash = false + +# autoscaler-type indicates which type of AutoScaler will be used. Possible values are: mock, aws, gcp. +# Only meaningful when disaggregated-tiflash is true +# autoscaler-type = "mock" + +# autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true. +# autoscaler-addr = "" + [log] # Log level: debug, info, warn, error, fatal. level = "info" diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 592bb55f79619..81eea3f2e784f 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" @@ -67,7 +66,6 @@ import ( "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/size" - "github.com/tikv/client-go/v2/tikv" ) const ( @@ -713,11 +711,7 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { - bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil) - stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil || len(stores) == 0 { - return false - } + // todo: just raise warning return true } diff --git a/session/session.go b/session/session.go index 2d95bf3bc73d6..985a1ad023dfa 100644 --- a/session/session.go +++ b/session/session.go @@ -3338,13 +3338,13 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, err } - if config.GetGlobalConfig().DisaggregatedTiFlash { - // Invalid client-go tiflash_compute store cache if necessary. - err = dom.WatchTiFlashComputeNodeChange() - if err != nil { - return nil, err - } - } + // if config.GetGlobalConfig().DisaggregatedTiFlash { + // // Invalid client-go tiflash_compute store cache if necessary. + // err = dom.WatchTiFlashComputeNodeChange() + // if err != nil { + // return nil, err + // } + // } if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil { return nil, err diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index b316d10acaf6e..7b5b0653a4af7 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/tiflashcompute" "github.com/stathat/consistent" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikv" @@ -545,37 +546,26 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, ran if err != nil { return nil, err } - cache := store.GetRegionCache() - stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) + // TODO: For now, force fetch topo from AutoScaler. + computeAddrs, err := tiflashcompute.GetGlobalTopoFetcher().FetchAndGetTopo() if err != nil { return nil, err } - if len(stores) == 0 { + if len(computeAddrs) == 0 { return nil, errors.New("No available tiflash_compute node") } hasher := consistent.New() - for _, store := range stores { - hasher.Add(store.GetAddr()) + for _, addr := range computeAddrs { + hasher.Add(addr) } for _, task := range batchTasks { addr, err := hasher.Get(task.storeAddr) if err != nil { return nil, err } - var store *tikv.Store - for _, s := range stores { - if s.GetAddr() == addr { - store = s - break - } - } - if store == nil { - return nil, errors.New("cannot find tiflash_compute store: " + addr) - } task.storeAddr = addr - task.ctx.Store = store task.ctx.Addr = addr } logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks))) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index c3225c40d1455..8314001ef8ad4 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -255,6 +255,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) if err != nil && disaggregatedTiFlash { + // todo: fix when use topoFetcher m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) } // No matter what the rpc error is, we won't retry the mpp dispatch tasks. diff --git a/tidb-server/main.go b/tidb-server/main.go index 97ad1917105ca..9312e51b83a69 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -74,6 +74,7 @@ import ( "github.com/pingcap/tidb/util/sys/linux" storageSys "github.com/pingcap/tidb/util/sys/storage" "github.com/pingcap/tidb/util/systimemon" + "github.com/pingcap/tidb/util/tiflashcompute" "github.com/pingcap/tidb/util/topsql" "github.com/pingcap/tidb/util/versioninfo" "github.com/prometheus/client_golang/prometheus" @@ -200,6 +201,12 @@ func main() { err := cpuprofile.StartCPUProfiler() terror.MustNil(err) + if config.GetGlobalConfig().DisaggregatedTiFlash { + err = tiflashcompute.InitGlobalTopoFetcher( + config.GetGlobalConfig().TiFlashComputeAutoScalerType, + config.GetGlobalConfig().TiFlashComputeAutoScalerAddr) + terror.MustNil(err) + } // Enable failpoints in tikv/client-go if the test API is enabled. // It appears in the main function to be set before any use of client-go to prevent data race. diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go new file mode 100644 index 0000000000000..28d1adeb8a427 --- /dev/null +++ b/util/tiflashcompute/topo_fetcher.go @@ -0,0 +1,233 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tiflashcompute + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "sync" +) + +var globalTopoFetcher TopoFetcher +var _ TopoFetcher = &MockTopoFetcher{} + +const ( + // MockASStr is String value for mock AutoScaler. + MockASStr = "mock" + // AWSASStr is String value for mock AutoScaler. + AWSASStr = "aws" + // GCPASStr is String value for mock AutoScaler. + GCPASStr = "gcp" +) + +const ( + // MockASType is int value for mock AutoScaler. + MockASType int = iota + // AWSASType is int value for mock AutoScaler. + AWSASType + // GCPASType is int value for mock AutoScaler. + GCPASType + // InvalidASType is int value for invalid check. + InvalidASType +) + +// TopoFetcher is interface for fetching topo from AutoScaler. +// There are three kinds of AutoScaler for now: +// 1. MockAutoScaler: Normally for test, can run in local environment. +// 2. AWSAutoScaler: AutoScaler runs on AWS. +// 3. GCPAutoScaler: AutoScaler runs on GCP. +type TopoFetcher interface { + // Return tiflash compute topo cache, if topo is empty, will fetch topo from AutoScaler. + // If topo is empty after fetch, will return error. + AssureAndGetTopo() ([]string, error) + + // Always fetch topo from AutoScaler, then return topo. + // If topo is empty, will not return error. You can call AssureAndGetTopo() to make sure topo is not empty. + FetchAndGetTopo() ([]string, error) +} + +// GetAutoScalerType return topo fetcher type. +func GetAutoScalerType(typ string) int { + switch typ { + case MockASStr: + return MockASType + case AWSASStr: + return AWSASType + case GCPASStr: + return GCPASType + default: + return InvalidASType + } +} + +// InitGlobalTopoFetcher init globalTopoFetcher if is in disaggregated-tiflash mode. It's not thread-safe. +func InitGlobalTopoFetcher(typ string, addr string) error { + if globalTopoFetcher != nil { + return errors.New("globalTopoFetcher alread inited") + } + + ft := GetAutoScalerType(typ) + switch ft { + case MockASType: + globalTopoFetcher = NewMockAutoScalerFetcher(addr) + return nil + case AWSASType, GCPASType: + return errors.Errorf("topo fetch not implemented yet(%s)", typ) + } + return errors.Errorf("unexpected topo fetch type. expect: %s or %s or %s, got %s", + MockASStr, AWSASStr, GCPASStr, typ) +} + +// GetGlobalTopoFetcher return global topo fetcher, not thread safe. +func GetGlobalTopoFetcher() TopoFetcher { + return globalTopoFetcher +} + +// MockTopoFetcher will fetch topo from MockAutoScaler. +// MockScaler can run in local environment. +type MockTopoFetcher struct { + mu struct { + sync.RWMutex + topo []string + topoTS int64 + } + // Mock AutoScaler addr. + addr string +} + +// NewMockAutoScalerFetcher create a new MockTopoFetcher. +func NewMockAutoScalerFetcher(addr string) *MockTopoFetcher { + f := &MockTopoFetcher{} + f.mu.topo = make([]string, 0, 8) + f.mu.topoTS = -1 + f.addr = addr + return f +} + +// AssureAndGetTopo implements TopoFetcher interface. +func (f *MockTopoFetcher) AssureAndGetTopo() ([]string, error) { + curTopo := f.getTopo() + + if len(curTopo) == 0 { + logutil.BgLogger().Info("tiflash compute topo is empty, updating") + err := f.assureTopo(1) + if err != nil { + return nil, err + } + } + + curTopo = f.getTopo() + logutil.BgLogger().Debug("AssureAndGetTopo", zap.Any("topo", curTopo)) + if len(curTopo) == 0 { + return curTopo, errors.New("topo is still empty after updated from mock AutoScaler") + } + return curTopo, nil +} + +// FetchAndGetTopo implements TopoFetcher interface. +func (f *MockTopoFetcher) FetchAndGetTopo() ([]string, error) { + err := f.fetchTopo() + if err != nil { + return nil, err + } + + curTopo := f.getTopo() + logutil.BgLogger().Debug("FetchAndGetTopo", zap.Any("topo", curTopo)) + return curTopo, nil +} + +// getTopo return the cached topo. +func (f *MockTopoFetcher) getTopo() []string { + f.mu.RLock() + defer f.mu.RUnlock() + return f.mu.topo +} + +// assureTopo will make sure topo is greater than nodeNum. +func (f *MockTopoFetcher) assureTopo(nodeNum int) error { + para := url.Values{} + para.Add("node_num", strconv.Itoa(nodeNum)) + u := url.URL{ + Scheme: "http", + Host: f.addr, + Path: "/assume-and-get-topo", + RawQuery: para.Encode(), + } + url := u.String() + logutil.BgLogger().Info("assureTopo", zap.Any("url", url)) + + newTopo, err := httpGetAndParseResp(url) + if err != nil { + return err + } + + f.mu.Lock() + defer f.mu.Unlock() + f.mu.topo = newTopo + return nil +} + +// fetchTopo will fetch newest topo from mock autoscaler. +func (f *MockTopoFetcher) fetchTopo() error { + u := url.URL{ + Scheme: "http", + Host: f.addr, + Path: "/fetch_topo", + } + url := u.String() + logutil.BgLogger().Info("fetchTopo", zap.Any("url", url)) + + newTopo, err := httpGetAndParseResp(url) + if err != nil { + return err + } + + f.mu.Lock() + defer f.mu.Unlock() + f.mu.topo = newTopo + return nil +} + +// httpGetAndParseResp send http get request and parse topo to []string. +func httpGetAndParseResp(url string) ([]string, error) { + resp, err := http.Get(url) + if err != nil { + return nil, errors.Trace(err) + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + bStr := string(b) + if resp.StatusCode != http.StatusOK { + return nil, errors.Errorf("http get mock AutoScaler failed. url: %s, status code: %s, http resp body: %s", url, http.StatusText(resp.StatusCode), bStr) + } + + // For MockAutoScaler, topo is like: ip:port;ip:port. + newTopo := strings.Split(bStr, ";") + if len(bStr) == 0 || len(newTopo) == 0 { + return nil, errors.New("topo list is empty") + } + logutil.BgLogger().Debug("assureTopo succeed", zap.Any("new topo", newTopo)) + return newTopo, nil +} From c7b60f3ee3511a82de61af9d2a5ccc4455e0b039 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 12 Jan 2023 20:51:39 +0800 Subject: [PATCH 02/18] fix batch_coprocessor Signed-off-by: guo-shaoge --- planner/core/fragment.go | 2 + store/copr/batch_coprocessor.go | 239 +++++++++++++++++++++++++------- 2 files changed, 188 insertions(+), 53 deletions(-) diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 7e86696ccc4d6..310304f3a7216 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -135,6 +135,8 @@ type mppAddr struct { addr string } +var _ kv.MPPTaskMeta = &mppAddr{} + func (m *mppAddr) GetAddress() string { return m.addr } diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 7b5b0653a4af7..b7cae510564be 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -21,6 +21,7 @@ import ( "io" "math" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -324,40 +325,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] storeTaskMap[taskStoreID] = batchTask } } else { - logutil.BgLogger().Info("detecting available mpp stores") - // decide the available stores stores := cache.RegionCache.GetTiFlashStores() - var wg sync.WaitGroup - var mu sync.Mutex - wg.Add(len(stores)) - for i := range stores { - go func(idx int) { - defer wg.Done() - s := stores[idx] - - // check if store is failed already. - ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) - if !ok { - return - } - - tikvClient := kvStore.GetTiKVClient() - ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) - if !ok { - GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) - return - } - - mu.Lock() - defer mu.Unlock() - storeTaskMap[s.StoreID()] = &batchCopTask{ - storeAddr: s.GetAddr(), - cmdType: originalTasks[0].cmdType, - ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, - } - }(i) + aliveStores := filterAliveStores(ctx, stores, ttl, kvStore) + for _, s := range aliveStores { + storeTaskMap[s.StoreID()] = &batchCopTask{ + storeAddr: s.GetAddr(), + cmdType: originalTasks[0].cmdType, + ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, + } } - wg.Wait() } var candidateRegionInfos []RegionInfo @@ -514,7 +490,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) + return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, ttl) } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -529,7 +505,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, ttl) } else { batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -541,38 +517,195 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, return batchTasks, nil } -func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { - batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) - if err != nil { - return nil, err +func filterAliveStoresStr(ctx context.Context, storesStr []string, ttl time.Duration, kvStore *kvStore) (aliveStores []string) { + aliveIdx := filterAliveStoresHelper(ctx, storesStr, ttl, kvStore) + for _, idx := range aliveIdx { + aliveStores = append(aliveStores, storesStr[idx]) } - // TODO: For now, force fetch topo from AutoScaler. - computeAddrs, err := tiflashcompute.GetGlobalTopoFetcher().FetchAndGetTopo() - if err != nil { - return nil, err + return aliveStores +} + +func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) (aliveStores []*tikv.Store) { + storesStr := make([]string, 0, len(stores)) + for _, s := range stores { + storesStr = append(storesStr, s.GetAddr()) + } + + aliveIdx := filterAliveStoresHelper(ctx, storesStr, ttl, kvStore) + for _, idx := range aliveIdx { + aliveStores = append(aliveStores, stores[idx]) } - if len(computeAddrs) == 0 { - return nil, errors.New("No available tiflash_compute node") + return aliveStores +} + +func filterAliveStoresHelper(ctx context.Context, stores []string, ttl time.Duration, kvStore *kvStore) (aliveIdx []int) { + var wg sync.WaitGroup + var mu sync.Mutex + wg.Add(len(stores)) + for i := range stores { + go func(idx int) { + defer wg.Done() + s := stores[idx] + + // Check if store is failed already. + if ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s, ttl); !ok { + return + } + + tikvClient := kvStore.GetTiKVClient() + if ok := detectMPPStore(ctx, tikvClient, s, DetectTimeoutLimit); !ok { + GlobalMPPFailedStoreProber.Add(ctx, s, tikvClient) + return + } + + mu.Lock() + defer mu.Unlock() + aliveIdx = append(aliveIdx, i) + }(i) } + wg.Wait() + logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveIdx))) + return aliveIdx +} + +func getTiFlashComputeRPCContextByConsistentHash(ids []tikv.RegionVerID, storesStr []string) (res []*tikv.RPCContext, err error) { hasher := consistent.New() - for _, addr := range computeAddrs { + for _, addr := range storesStr { hasher.Add(addr) } - for _, task := range batchTasks { - addr, err := hasher.Get(task.storeAddr) + + for _, id := range ids { + addr, err := hasher.Get(strconv.Itoa(int(id.GetID()))) if err != nil { return nil, err } - task.storeAddr = addr - task.ctx.Addr = addr + rpcCtx := &tikv.RPCContext{ + Region: id, + Addr: addr, + } + + res = append(res, rpcCtx) } - logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks))) - for _, task := range batchTasks { - logutil.BgLogger().Debug("batchTasks detailed info", zap.String("addr", task.storeAddr), zap.Int("RegionInfo number", len(task.regionInfos))) + return res, nil +} + +// 1. Split range by region location to build copTasks. +// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash. +// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask. +func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, + kvStore *kvStore, + rangesForEachPhysicalTable []*KeyRanges, + storeType kv.StoreType, + ttl time.Duration) (res []*batchCopTask, err error) { + const cmdType = tikvrpc.CmdBatchCop + var retryNum int + cache := kvStore.GetRegionCache() + + for { + retryNum++ + var rangesLen int + tasks := make([]*copTask, 0) + regionIDs := make([]tikv.RegionVerID, 0) + + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + if err != nil { + return nil, errors.Trace(err) + } + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + regionIDs = append(regionIDs, lo.Location.Region) + } + } + + storesStr, err := tiflashcompute.GetGlobalTopoFetcher().FetchAndGetTopo() + if err != nil { + return nil, err + } + stores := filterAliveStoresStr(bo.GetCtx(), storesStr, ttl, kvStore) + if len(stores) == 0 { + return nil, errors.New("tiflash_compute node is unavailable") + } + + // todo: put to here; construct rpcCtx by self. + rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr) + if err != nil { + return nil, err + } + if rpcCtxs == nil { + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) + } + continue + } + if len(rpcCtxs) != len(tasks) { + return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) + } + taskMap := make(map[string]*batchCopTask) + for i, rpcCtx := range rpcCtxs { + regionInfo := RegionInfo{ + // tasks and rpcCtxs are correspond to each other. + Region: tasks[i].region, + Meta: rpcCtx.Meta, + Ranges: tasks[i].ranges, + AllStores: []uint64{rpcCtx.Store.StoreID()}, + PartitionIndex: tasks[i].partitionIndex, + } + if batchTask, ok := taskMap[rpcCtx.Addr]; ok { + batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []RegionInfo{regionInfo}, + } + taskMap[rpcCtx.Addr] = batchTask + res = append(res, batchTask) + } + } + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + break } - return batchTasks, nil + + failpointCheckForConsistentHash(res) + return res, nil +} + +func failpointCheckForConsistentHash(tasks []*batchCopTask) { + failpoint.Inject("checkOnlyDispatchToTiFlashComputeNodes", func(val failpoint.Value) { + logutil.BgLogger().Debug("in checkOnlyDispatchToTiFlashComputeNodes") + + // This failpoint will be tested in test-infra case, because we needs setup a cluster. + // All tiflash_compute nodes addrs are stored in val, separated by semicolon. + str := val.(string) + addrs := strings.Split(str, ";") + if len(addrs) < 1 { + err := fmt.Sprintf("unexpected length of tiflash_compute node addrs: %v, %s", len(addrs), str) + panic(err) + } + addrMap := make(map[string]struct{}) + for _, addr := range addrs { + addrMap[addr] = struct{}{} + } + for _, batchTask := range tasks { + if _, ok := addrMap[batchTask.storeAddr]; !ok { + err := errors.Errorf("batchCopTask send to node which is not tiflash_compute: %v(tiflash_compute nodes: %s)", batchTask.storeAddr, str) + panic(err) + } + } + }) } // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. From 50439a7bc83aafed95386e1a6a6355353469f541 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 13 Jan 2023 12:08:10 +0800 Subject: [PATCH 03/18] fix rpcCtx.Meta nil Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index b7cae510564be..43ccb290601f7 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -641,6 +641,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, if err != nil { return nil, err } + // todo: no need if rpcCtxs == nil { logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) @@ -657,9 +658,10 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, regionInfo := RegionInfo{ // tasks and rpcCtxs are correspond to each other. Region: tasks[i].region, - Meta: rpcCtx.Meta, + // todo: no need + // Meta: rpcCtx.Meta, Ranges: tasks[i].ranges, - AllStores: []uint64{rpcCtx.Store.StoreID()}, + // AllStores: []uint64{rpcCtx.Store.StoreID()}, PartitionIndex: tasks[i].partitionIndex, } if batchTask, ok := taskMap[rpcCtx.Addr]; ok { From ba260d03dc5d2748227c41283028e2ceb5db7c91 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sun, 15 Jan 2023 12:34:28 +0800 Subject: [PATCH 04/18] tmp safe AWSTopoFetcher Signed-off-by: guo-shaoge --- util/tiflashcompute/topo_fetcher.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index 28d1adeb8a427..3a661ee6d9d1e 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -107,7 +107,6 @@ type MockTopoFetcher struct { mu struct { sync.RWMutex topo []string - topoTS int64 } // Mock AutoScaler addr. addr string @@ -117,7 +116,6 @@ type MockTopoFetcher struct { func NewMockAutoScalerFetcher(addr string) *MockTopoFetcher { f := &MockTopoFetcher{} f.mu.topo = make([]string, 0, 8) - f.mu.topoTS = -1 f.addr = addr return f } @@ -231,3 +229,30 @@ func httpGetAndParseResp(url string) ([]string, error) { logutil.BgLogger().Debug("assureTopo succeed", zap.Any("new topo", newTopo)) return newTopo, nil } + +type AWSTopoFetcher struct { + mu struct { + sync.RWMutex + topo []string + topoTS int64 + } + // AWS AutoScaler addr. + addr string + IsFixedPool bool +} + +func NewAWSAutoScalerFetcher(addr string) *AWSTopoFetcher { + f := &AWSTopoFetcher{} + f.mu.topo = make([]string, 0, 8) + f.mu.topoTS = -1 + f.addr = addr + return f +} + +func (f *AWSTopoFetcher) AssureAndGetTopo() ([]string, error) { + return nil, nil +} + +func (f *AWSTopoFetcher) FetchAndGetTopo() ([]string, error) { + return nil, nil +} From 95ad661ca523b5b39a213e1f5503b5913c2b65a0 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 17 Jan 2023 11:04:51 +0800 Subject: [PATCH 05/18] add AWSTopoFetcher Signed-off-by: guo-shaoge --- config/config.go | 13 +- config/config.toml.example | 7 +- tidb-server/main.go | 12 +- util/tiflashcompute/topo_fetcher.go | 192 +++++++++++++++++++++++++--- 4 files changed, 198 insertions(+), 26 deletions(-) diff --git a/config/config.go b/config/config.go index 13d1b0c4fe1b3..34c9c1e6c924b 100644 --- a/config/config.go +++ b/config/config.go @@ -288,9 +288,13 @@ type Config struct { Plugin Plugin `toml:"plugin" json:"plugin"` MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` RunDDL bool `toml:"run-ddl" json:"run-ddl"` - DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"` - TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"` - TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` + + // These config is related to disaggregated-tiflash mode. + DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"` + TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"` + TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` + IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"` + // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` // TiDBMaxReuseColumn indicates max cached column num @@ -1001,6 +1005,9 @@ var defaultConf = Config{ EnableGlobalKill: true, TrxSummary: DefaultTrxSummary(), DisaggregatedTiFlash: false, + TiFlashComputeAutoScalerType: tiflashcompute.DefASStr, + TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr, + IsTiFlashComputeFixedPool: false, TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, } diff --git a/config/config.toml.example b/config/config.toml.example index cd46a4ef446d9..032c7578c0a29 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -119,10 +119,13 @@ disaggregated-tiflash = false # autoscaler-type indicates which type of AutoScaler will be used. Possible values are: mock, aws, gcp. # Only meaningful when disaggregated-tiflash is true -# autoscaler-type = "mock" +# autoscaler-type = "aws" # autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true. -# autoscaler-addr = "" +# autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081" + +# is-tiflashcompute-fixed-pool controls whether autoscaler use fixed shared pool(free user) or not. +# is-tiflashcompute-fixed-pool: false [log] # Log level: debug, info, warn, error, fatal. diff --git a/tidb-server/main.go b/tidb-server/main.go index 7287c334de88f..17650b141e79e 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -203,15 +203,19 @@ func main() { err := cpuprofile.StartCPUProfiler() terror.MustNil(err) + if config.GetGlobalConfig().DisaggregatedTiFlash { err = tiflashcompute.InitGlobalTopoFetcher( config.GetGlobalConfig().TiFlashComputeAutoScalerType, - config.GetGlobalConfig().TiFlashComputeAutoScalerAddr) - terror.MustNil(err) + config.GetGlobalConfig().TiFlashComputeAutoScalerAddr, + // todo: fix + "mock_cluste_id", + config.GetGlobalConfig().IsTiFlashComputeFixedPool) + terror.MustNil(err) } - // Enable failpoints in tikv/client-go if the test API is enabled. - // It appears in the main function to be set before any use of client-go to prevent data race. + // Enable failpoints in tikv/client-go if the test API is enabled. + // It appears in the main function to be set before any use of client-go to prevent data race. if _, err := failpoint.Status("github.com/pingcap/tidb/server/enableTestAPI"); err == nil { warnMsg := "tikv/client-go failpoint is enabled, this should NOT happen in the production environment" logutil.BgLogger().Warn(warnMsg) diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index 3a661ee6d9d1e..d0f1073476046 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -15,19 +15,22 @@ package tiflashcompute import ( - "github.com/pingcap/errors" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" + "encoding/json" "io/ioutil" "net/http" "net/url" "strconv" "strings" "sync" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) var globalTopoFetcher TopoFetcher var _ TopoFetcher = &MockTopoFetcher{} +var _ TopoFetcher = &AWSTopoFetcher{} const ( // MockASStr is String value for mock AutoScaler. @@ -49,6 +52,16 @@ const ( InvalidASType ) +const ( + // DefAWSAutoScalerAddr is the default address for AWS AutoScaler. + DefAWSAutoScalerAddr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081" + // DefASStr default AutoScaler. + DefASStr = AWSASStr + + awsFixedPoolHTTPPath = "sharedfixedpool" + awsFetchHTTPPath = "resume-and-get-topology" +) + // TopoFetcher is interface for fetching topo from AutoScaler. // There are three kinds of AutoScaler for now: // 1. MockAutoScaler: Normally for test, can run in local environment. @@ -79,17 +92,19 @@ func GetAutoScalerType(typ string) int { } // InitGlobalTopoFetcher init globalTopoFetcher if is in disaggregated-tiflash mode. It's not thread-safe. -func InitGlobalTopoFetcher(typ string, addr string) error { - if globalTopoFetcher != nil { - return errors.New("globalTopoFetcher alread inited") - } +func InitGlobalTopoFetcher(typ string, addr string, clusterID string, isFixedPool bool) error { + logutil.BgLogger().Info("globalTopoFetcher inited", zap.Any("type", typ), zap.Any("addr", addr), + zap.Any("clusterID", clusterID), zap.Any("isFixedPool", isFixedPool)) ft := GetAutoScalerType(typ) switch ft { case MockASType: globalTopoFetcher = NewMockAutoScalerFetcher(addr) return nil - case AWSASType, GCPASType: + case AWSASType: + globalTopoFetcher = NewAWSAutoScalerFetcher(addr, clusterID, isFixedPool) + return nil + case GCPASType: return errors.Errorf("topo fetch not implemented yet(%s)", typ) } return errors.Errorf("unexpected topo fetch type. expect: %s or %s or %s, got %s", @@ -106,7 +121,7 @@ func GetGlobalTopoFetcher() TopoFetcher { type MockTopoFetcher struct { mu struct { sync.RWMutex - topo []string + topo []string } // Mock AutoScaler addr. addr string @@ -226,33 +241,176 @@ func httpGetAndParseResp(url string) ([]string, error) { if len(bStr) == 0 || len(newTopo) == 0 { return nil, errors.New("topo list is empty") } - logutil.BgLogger().Debug("assureTopo succeed", zap.Any("new topo", newTopo)) + logutil.BgLogger().Debug("httpGetAndParseResp succeed", zap.Any("new topo", newTopo)) return newTopo, nil } +// AWSTopoFetcher will fetch topo from AWSAutoScaler. type AWSTopoFetcher struct { mu struct { sync.RWMutex - topo []string + topo []string topoTS int64 } // AWS AutoScaler addr. - addr string - IsFixedPool bool + // These should be init when TiDB start, all single threaded, no need to lock. + addr string + clusterID string + isFixedPool bool +} + +type resumeAndGetTopologyResult struct { + HasError int `json:"hasError"` + ErrorInfo string `json:"errorInfo"` + State string `json:"state"` + Topology []string `json:"topology"` + Timestamp string `json:"timestamp"` } -func NewAWSAutoScalerFetcher(addr string) *AWSTopoFetcher { +// NewAWSAutoScalerFetcher create a new AWSTopoFetcher. +func NewAWSAutoScalerFetcher(addr string, clusterID string, isFixed bool) *AWSTopoFetcher { f := &AWSTopoFetcher{} f.mu.topo = make([]string, 0, 8) f.mu.topoTS = -1 f.addr = addr + f.clusterID = clusterID + f.isFixedPool = isFixed return f } +// AssureAndGetTopo implements TopoFetcher interface. func (f *AWSTopoFetcher) AssureAndGetTopo() ([]string, error) { - return nil, nil + return nil, errors.New("AWSTopoFetcher AssureAndGetTopo not implemented") } -func (f *AWSTopoFetcher) FetchAndGetTopo() ([]string, error) { - return nil, nil +// FetchAndGetTopo implements TopoFetcher interface. +func (f *AWSTopoFetcher) FetchAndGetTopo() (curTopo []string, err error) { + defer func() { + logutil.BgLogger().Info("AWSTopoFetcher FetchAndGetTopo done", zap.Any("curTopo", curTopo)) + }() + + if f.isFixedPool { + // todo: delete this when AssureAndGetTopo() is done. + curTopo, _ = f.getTopo() + if len(curTopo) != 0 { + return curTopo, nil + } + + if err = f.fetchFixedPoolTopo(); err != nil { + return nil, err + } + curTopo, _ = f.getTopo() + return curTopo, nil + } + + if err = f.fetchTopo(); err != nil { + return nil, err + } + + curTopo, _ = f.getTopo() + return curTopo, nil +} + +func awsHTTPGetAndParseResp(url string) (*resumeAndGetTopologyResult, error) { + resp, err := http.Get(url) + if err != nil { + return nil, errors.Trace(err) + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + bStr := string(b) + if resp.StatusCode != http.StatusOK { + return nil, errors.Errorf("http get mock AutoScaler failed. url: %s, status code: %s, http resp body: %s", url, http.StatusText(resp.StatusCode), bStr) + } + + res := &resumeAndGetTopologyResult{} + if err = json.Unmarshal(b, &res); err != nil { + return nil, errors.Trace(err) + } + + logutil.BgLogger().Debug("awsHTTPGetAndParseResp succeed", zap.Any("resp", res)) + return res, nil +} + +func (f *AWSTopoFetcher) tryUpdateTopo(newTopo *resumeAndGetTopologyResult) (updated bool, err error) { + cachedTopo, cachedTS := f.getTopo() + newTS, err := strconv.ParseInt(newTopo.Timestamp, 10, 64) + defer func() { + logutil.BgLogger().Info("try update topo", zap.Any("updated", updated), zap.Any("err", err), + zap.Any("cached TS", cachedTS), zap.Any("cached Topo", cachedTopo), + zap.Any("fetch TS", newTopo.Timestamp), zap.Any("converted TS", newTS), zap.Any("fetch topo", newTopo.Topology)) + }() + if err != nil { + return updated, errors.Errorf("parse newTopo.timestamp failed when update topo: %s", err.Error()) + } + + if cachedTS >= newTS { + return + } + + f.mu.Lock() + defer f.mu.Unlock() + cachedTS = f.mu.topoTS + if cachedTS > newTS { + return + } + updated = true + f.mu.topo = newTopo.Topology + f.mu.topoTS = newTS + return +} + +func (f *AWSTopoFetcher) fetchFixedPoolTopo() error { + u := url.URL{ + Scheme: "http", + Host: f.addr, + Path: awsFixedPoolHTTPPath, + } + url := u.String() + logutil.BgLogger().Info("fetchFixedPoolTopo", zap.Any("url", url)) + + newTopo, err := awsHTTPGetAndParseResp(url) + if err != nil { + return err + } + + _, err = f.tryUpdateTopo(newTopo) + if err != nil { + return err + } + return nil +} + +func (f *AWSTopoFetcher) fetchTopo() error { + para := url.Values{} + para.Add("tidbclusterid", f.clusterID) + u := url.URL{ + Scheme: "http", + Host: f.addr, + Path: awsFetchHTTPPath, + RawQuery: para.Encode(), + } + url := u.String() + logutil.BgLogger().Info("fetchTopo", zap.Any("url", url)) + + newTopo, err := awsHTTPGetAndParseResp(url) + if err != nil { + return err + } + + _, err = f.tryUpdateTopo(newTopo) + if err != nil { + return err + } + return nil +} + +func (f *AWSTopoFetcher) getTopo() ([]string, int64) { + f.mu.RLock() + defer f.mu.RUnlock() + return f.mu.topo, f.mu.topoTS } From bdb6b513d57427fb2a4fe98d8525a28f41403800 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 19 Jan 2023 17:45:51 +0800 Subject: [PATCH 06/18] batch update same with(cse: 88bfe6e6ef4062b517e04d47bbc09751cc0bca0d) Signed-off-by: guo-shaoge --- config/BUILD.bazel | 1 + config/config.go | 43 +++++++- config/config.toml.example | 12 ++- executor/tiflashtest/BUILD.bazel | 1 + executor/tiflashtest/tiflash_test.go | 11 ++- planner/core/BUILD.bazel | 1 - planner/core/logical_plan_builder.go | 13 --- planner/core/planbuilder.go | 24 +---- session/session.go | 8 -- store/copr/BUILD.bazel | 4 + store/copr/batch_coprocessor.go | 140 ++++++++++++++------------- store/copr/batch_coprocessor_test.go | 22 +++++ store/copr/batch_request_sender.go | 1 + store/copr/mpp.go | 22 +---- tidb-server/BUILD.bazel | 1 + tidb-server/main.go | 13 ++- util/tiflashcompute/BUILD.bazel | 13 +++ util/tiflashcompute/topo_fetcher.go | 50 +++++++--- 18 files changed, 220 insertions(+), 160 deletions(-) create mode 100644 util/tiflashcompute/BUILD.bazel diff --git a/config/BUILD.bazel b/config/BUILD.bazel index e06843bec3df2..069133cba41bb 100644 --- a/config/BUILD.bazel +++ b/config/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//br/pkg/streamhelper/config", "//parser/terror", "//util/logutil", + "//util/tiflashcompute", "//util/tikvutil", "//util/versioninfo", "@com_github_burntsushi_toml//:toml", diff --git a/config/config.go b/config/config.go index 34c9c1e6c924b..c8a1dbd2f1c68 100644 --- a/config/config.go +++ b/config/config.go @@ -15,6 +15,7 @@ package config import ( + "reflect" "bytes" "encoding/base64" "encoding/json" @@ -94,6 +95,8 @@ const ( DefAuthTokenRefreshInterval = time.Hour // EnvVarKeyspaceName is the system env name for keyspace name. EnvVarKeyspaceName = "KEYSPACE_NAME" + // ConfigKeyspaceFieldName is the struct field name Config.KeyspaceName. + ConfigKeyspaceFieldName = "KeyspaceName" ) // Valid config maps @@ -283,17 +286,18 @@ type Config struct { OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"` // These items are deprecated because they are turned into instance system variables. - CheckMb4ValueInUTF8 AtomicBool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"` - EnableCollectExecutionInfo bool `toml:"enable-collect-execution-info" json:"enable-collect-execution-info"` - Plugin Plugin `toml:"plugin" json:"plugin"` - MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` - RunDDL bool `toml:"run-ddl" json:"run-ddl"` + CheckMb4ValueInUTF8 AtomicBool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"` + EnableCollectExecutionInfo bool `toml:"enable-collect-execution-info" json:"enable-collect-execution-info"` + Plugin Plugin `toml:"plugin" json:"plugin"` + MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` + RunDDL bool `toml:"run-ddl" json:"run-ddl"` // These config is related to disaggregated-tiflash mode. DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"` TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"` TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"` + ClusterName string `toml:"cluster-name" json:"cluster-name"` // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` @@ -1008,6 +1012,7 @@ var defaultConf = Config{ TiFlashComputeAutoScalerType: tiflashcompute.DefASStr, TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr, IsTiFlashComputeFixedPool: false, + ClusterName: "", TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, } @@ -1038,6 +1043,34 @@ func StoreGlobalConfig(config *Config) { tikvcfg.StoreGlobalConfig(&cfg) } +// GetClusterName() returns clusterName, which is KeyspaceName or ClusterName. +func GetClusterName() (string, error) { + c := GetGlobalConfig() + var keyspaceName string + clusterName := c.ClusterName + + // TODO: Delete using reflect when keyspace code is merged. + v := reflect.ValueOf(c).FieldByName(ConfigKeyspaceFieldName) + if v.IsValid() { + if v.Kind() != reflect.String { + terror.MustNil(errors.New("config.KeyspaceName should be String type")) + } + keyspaceName = v.String() + } + if keyspaceName != "" && clusterName != "" { + return "", errors.Errorf("config.KeyspaceName(%s) and config.ClusterName(%s) are not empty both", keyspaceName, clusterName) + } + if keyspaceName == "" && clusterName == "" { + return "", errors.Errorf("config.KeyspaceName and config.ClusterName are both empty") + } + + res := keyspaceName + if res == "" { + res = clusterName + } + return res, nil +} + // removedConfig contains items that are no longer supported. // they might still be in the config struct to support import, // but are not actively used. diff --git a/config/config.toml.example b/config/config.toml.example index 032c7578c0a29..d3642b6351bb1 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -119,13 +119,19 @@ disaggregated-tiflash = false # autoscaler-type indicates which type of AutoScaler will be used. Possible values are: mock, aws, gcp. # Only meaningful when disaggregated-tiflash is true -# autoscaler-type = "aws" +autoscaler-type = "aws" # autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true. -# autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081" +autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081" + +# cluster-name is the name of cluster. Only meaningfule for Decicated Tier. +cluster-name = "" # is-tiflashcompute-fixed-pool controls whether autoscaler use fixed shared pool(free user) or not. -# is-tiflashcompute-fixed-pool: false +is-tiflashcompute-fixed-pool = false + +# skip-gc-drop-table is used to control whether to skip keyspace gc drop table range in gc worker. +skip-gc-drop-table = true [log] # Log level: debug, info, warn, error, fatal. diff --git a/executor/tiflashtest/BUILD.bazel b/executor/tiflashtest/BUILD.bazel index c7678e569522d..a9e7a5ee027c4 100644 --- a/executor/tiflashtest/BUILD.bazel +++ b/executor/tiflashtest/BUILD.bazel @@ -22,6 +22,7 @@ go_test( "//testkit", "//testkit/external", "//util/israce", + "//util/tiflashcompute", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index e8cd94d889188..641532509e322 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/testkit/external" "github.com/pingcap/tidb/util/israce" + "github.com/pingcap/tidb/util/tiflashcompute" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" ) @@ -1258,6 +1259,9 @@ func TestDisaggregatedTiFlash(t *testing.T) { config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = true }) + err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.TestASStr, "", "", false) + require.NoError(t, err) + store := testkit.CreateMockStore(t, withMockTiFlash(2)) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1265,12 +1269,12 @@ func TestDisaggregatedTiFlash(t *testing.T) { tk.MustExec("create table t(c1 int)") tk.MustExec("alter table t set tiflash replica 1") tb := external.GetTableByName(t, tk, "test", "t") - err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) require.NoError(t, err) tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") err = tk.ExecToErr("select * from t;") - require.Contains(t, err.Error(), "Please check tiflash_compute node is available") + require.Contains(t, err.Error(), "Cannot find proper topo from AutoScaler") config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = false @@ -1304,9 +1308,6 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) { require.NoError(t, err) tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") - needCheckTiFlashComputeNode := "false" - failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) - defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") tk.MustExec("set @@tidb_partition_prune_mode = 'static';") diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 3afbdf3b8a0bc..12c2730c59364 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -107,7 +107,6 @@ go_library( "//sessiontxn/staleread", "//statistics", "//statistics/handle", - "//store/driver/backoff", "//table", "//table/tables", "//table/temptable", diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index a2f31cf423cd4..7cac4b3a9789a 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -690,13 +689,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.preferStoreType = 0 return } - if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) { - // TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available. - errMsg := "No available tiflash_compute node" - warning := ErrInternal.GenWithStack(errMsg) - ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) - return - } for _, path := range ds.possibleAccessPaths { if path.StoreType == kv.TiFlash { ds.preferStoreType |= preferTiFlash @@ -714,11 +706,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } } -func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { - // todo: just raise warning - return true -} - func resetNotNullFlag(schema *expression.Schema, start, end int) { for i := start; i < end; i++ { col := *schema.Columns[i] diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 649a60415b359..f3acd165e3eae 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -26,7 +26,6 @@ import ( "unsafe" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -1453,8 +1452,6 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines() availableEngine := map[kv.StoreType]struct{}{} var availableEngineStr string - var outputComputeNodeErrMsg bool - noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx) for i := len(paths) - 1; i >= 0; i-- { // availableEngineStr is for warning message. if _, ok := availableEngine[paths[i].StoreType]; !ok { @@ -1464,20 +1461,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, } availableEngineStr += paths[i].StoreType.Name() } - _, exists := isolationReadEngines[paths[i].StoreType] - // Prune this path if: - // 1. path.StoreType doesn't exists in isolationReadEngines or - // 2. TiFlash is disaggregated and the number of tiflash_compute node is zero. - shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash - failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) { - // Ignore check if tiflash_compute node number. - // After we support disaggregated tiflash in test framework, can delete this failpoint. - shouldPruneTiFlashCompute = val.(bool) - }) - if shouldPruneTiFlashCompute { - outputComputeNodeErrMsg = true - } - if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute { + if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB { paths = append(paths[:i], paths[i+1:]...) } } @@ -1486,11 +1470,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, if len(paths) == 0 { helpMsg := "" if engineVals == "tiflash" { - if outputComputeNodeErrMsg { - helpMsg = ". Please check tiflash_compute node is available" - } else { - helpMsg = ". Please check tiflash replica or ensure the query is readonly" - } + helpMsg = ". Please check tiflash replica or ensure the query is readonly" } err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(), variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg)) diff --git a/session/session.go b/session/session.go index e15eb15048e8b..930d4861130bd 100644 --- a/session/session.go +++ b/session/session.go @@ -3340,14 +3340,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, err } - // if config.GetGlobalConfig().DisaggregatedTiFlash { - // // Invalid client-go tiflash_compute store cache if necessary. - // err = dom.WatchTiFlashComputeNodeChange() - // if err != nil { - // return nil, err - // } - // } - if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil { return nil, err } diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index 9ea8467d01dfa..8054765dedceb 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//util/mathutil", "//util/memory", "//util/paging", + "//util/tiflashcompute", "//util/trxevents", "@com_github_dgraph_io_ristretto//:ristretto", "@com_github_gogo_protobuf//proto", @@ -42,6 +43,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_pingcap_log//:log", "@com_github_pingcap_tipb//go-tipb", + "@com_github_stathat_consistent//:consistent", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", @@ -75,6 +77,7 @@ go_test( "//kv", "//store/driver/backoff", "//testkit/testsetup", + "//util/logutil", "//util/paging", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/coprocessor", @@ -86,5 +89,6 @@ go_test( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//:zap", ], ) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 43ccb290601f7..bf3e16387f33a 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -45,6 +45,8 @@ import ( "golang.org/x/exp/slices" ) +const fetchTopoMaxBackoff = 20000 + // batchCopTask comprises of multiple copTask that will send to same store. type batchCopTask struct { storeAddr string @@ -481,7 +483,9 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] return ret } -func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, +func buildBatchCopTasksForNonPartitionedTable( + ctx context.Context, + bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, @@ -490,7 +494,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, ttl) + return buildBatchCopTasksConsistentHash(ctx, bo, store, []*KeyRanges{ranges}, storeType, ttl) } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -505,7 +509,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, ttl) + batchTasks, err = buildBatchCopTasksConsistentHash(ctx, bo, store, rangesForEachPhysicalTable, storeType, ttl) } else { batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -594,92 +598,94 @@ func getTiFlashComputeRPCContextByConsistentHash(ids []tikv.RegionVerID, storesS // 1. Split range by region location to build copTasks. // 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash. // 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask. -func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, +func buildBatchCopTasksConsistentHash( + ctx context.Context, + bo *backoff.Backoffer, kvStore *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, ttl time.Duration) (res []*batchCopTask, err error) { const cmdType = tikvrpc.CmdBatchCop - var retryNum int cache := kvStore.GetRegionCache() + fetchTopoBo := backoff.NewBackofferWithVars(ctx, fetchTopoMaxBackoff, nil) - for { - retryNum++ - var rangesLen int - tasks := make([]*copTask, 0) - regionIDs := make([]tikv.RegionVerID, 0) + var retryNum int + var rangesLen int + var storesStr []string - for i, ranges := range rangesForEachPhysicalTable { - rangesLen += ranges.Len() - locations, err := cache.SplitKeyRangesByLocations(bo, ranges) - if err != nil { - return nil, errors.Trace(err) - } - for _, lo := range locations { - tasks = append(tasks, &copTask{ - region: lo.Location.Region, - ranges: lo.Ranges, - cmdType: cmdType, - storeType: storeType, - partitionIndex: int64(i), - }) - regionIDs = append(regionIDs, lo.Location.Region) - } - } + tasks := make([]*copTask, 0) + regionIDs := make([]tikv.RegionVerID, 0) - storesStr, err := tiflashcompute.GetGlobalTopoFetcher().FetchAndGetTopo() + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges) if err != nil { - return nil, err + return nil, errors.Trace(err) } - stores := filterAliveStoresStr(bo.GetCtx(), storesStr, ttl, kvStore) - if len(stores) == 0 { - return nil, errors.New("tiflash_compute node is unavailable") + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + regionIDs = append(regionIDs, lo.Location.Region) } + } - // todo: put to here; construct rpcCtx by self. - rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr) + for { + retryNum++ + // todo: use AssureAndGetTopo() after SNS is done. + storesStr, err = tiflashcompute.GetGlobalTopoFetcher().FetchAndGetTopo() if err != nil { return nil, err } - // todo: no need - if rpcCtxs == nil { - logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) - err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if len(storesStr) == 0 { + retErr := errors.New("Cannot find proper topo from AutoScaler") + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because FetchAndGetTopo return empty topo", zap.Int("retryNum", retryNum)) + err := fetchTopoBo.Backoff(tikv.BoTiFlashRPC(), retErr) if err != nil { - return nil, errors.Trace(err) + return nil, retErr } continue } - if len(rpcCtxs) != len(tasks) { - return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) + break + } + + // todo: put to here; construct rpcCtx by self. + rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr) + if err != nil { + return nil, err + } + if len(rpcCtxs) != len(tasks) { + return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) + } + taskMap := make(map[string]*batchCopTask) + for i, rpcCtx := range rpcCtxs { + regionInfo := RegionInfo{ + // tasks and rpcCtxs are correspond to each other. + Region: tasks[i].region, + Ranges: tasks[i].ranges, + PartitionIndex: tasks[i].partitionIndex, + // No need to setup regionMeta and Store info. + // Meta: rpcCtx.Meta, + // AllStores: []uint64{rpcCtx.Store.StoreID()}, } - taskMap := make(map[string]*batchCopTask) - for i, rpcCtx := range rpcCtxs { - regionInfo := RegionInfo{ - // tasks and rpcCtxs are correspond to each other. - Region: tasks[i].region, - // todo: no need - // Meta: rpcCtx.Meta, - Ranges: tasks[i].ranges, - // AllStores: []uint64{rpcCtx.Store.StoreID()}, - PartitionIndex: tasks[i].partitionIndex, - } - if batchTask, ok := taskMap[rpcCtx.Addr]; ok { - batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) - } else { - batchTask := &batchCopTask{ - storeAddr: rpcCtx.Addr, - cmdType: cmdType, - ctx: rpcCtx, - regionInfos: []RegionInfo{regionInfo}, - } - taskMap[rpcCtx.Addr] = batchTask - res = append(res, batchTask) + if batchTask, ok := taskMap[rpcCtx.Addr]; ok { + batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []RegionInfo{regionInfo}, } + taskMap[rpcCtx.Addr] = batchTask + res = append(res, batchTask) } - logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) - break } + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(storesStr))) failpointCheckForConsistentHash(res) return res, nil @@ -860,7 +866,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V keyRanges = append(keyRanges, NewKeyRanges(pi.KeyRanges)) partitionIDs = append(partitionIDs, pi.ID) } - tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, false, 0, false, 0, partitionIDs) + tasks, err = buildBatchCopTasksForPartitionedTable(ctx, bo, c.store.kvStore, keyRanges, req.StoreType, false, 0, false, 0, partitionIDs) } else { // TODO: merge the if branch. ranges := NewKeyRanges(req.KeyRanges.FirstPartitionRange()) @@ -1030,7 +1036,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba } keyRanges = append(keyRanges, NewKeyRanges(ranges)) } - ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, false, 0, false, 0, pid) + ret, err := buildBatchCopTasksForPartitionedTable(ctx, bo, b.store, keyRanges, b.req.StoreType, false, 0, false, 0, pid) return ret, err } diff --git a/store/copr/batch_coprocessor_test.go b/store/copr/batch_coprocessor_test.go index 3e10ce627b1f6..b6d8bd5aeb84a 100644 --- a/store/copr/batch_coprocessor_test.go +++ b/store/copr/batch_coprocessor_test.go @@ -15,16 +15,21 @@ package copr import ( + "context" "math/rand" "sort" "strconv" "testing" "time" + "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/driver/backoff" + "github.com/pingcap/tidb/util/logutil" "github.com/stathat/consistent" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" + "go.uber.org/zap" ) // StoreID: [1, storeCount] @@ -205,3 +210,20 @@ func TestConsistentHash(t *testing.T) { } } } + +func TestTopoFetcherBackoff(t *testing.T) { + fetchTopoBo := backoff.NewBackofferWithVars(context.Background(), fetchTopoMaxBackoff, nil) + expectErr := errors.New("Cannot find proper topo from AutoScaler") + var retryNum int + start := time.Now() + for { + retryNum++ + if err := fetchTopoBo.Backoff(tikv.BoTiFlashRPC(), expectErr); err != nil { + break + } + logutil.BgLogger().Info("TestTopoFetcherBackoff", zap.Any("retryNum", retryNum)) + } + dura := time.Since(start) + // fetchTopoMaxBackoff is milliseconds. + require.GreaterOrEqual(t, dura, time.Duration(fetchTopoMaxBackoff*1000)) +} diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index b976d26a59ab3..b999d43df6f45 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 3e6e7a9f23fab..9af0bb919593c 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -74,13 +74,13 @@ func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasks rangesForEachPartition[i] = NewKeyRanges(p.KeyRanges) partitionIDs[i] = p.ID } - tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, true, ttl, true, 20, partitionIDs) + tasks, err = buildBatchCopTasksForPartitionedTable(ctx, bo, c.store, rangesForEachPartition, kv.TiFlash, true, ttl, true, 20, partitionIDs) } else { if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil } ranges := NewKeyRanges(req.KeyRanges) - tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, true, ttl, true, 20) + tasks, err = buildBatchCopTasksForNonPartitionedTable(ctx, bo, c.store, ranges, kv.TiFlash, true, ttl, true, 20) } if err != nil { @@ -239,7 +239,6 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req } } - disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPTask, mppReq, kvrpcpb.Context{}) wrappedReq.StoreTp = getEndPointType(kv.TiFlash) @@ -259,10 +258,6 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req // That's a hard job but we can try it in the future. if sender.GetRPCError() != nil { logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) - if disaggregatedTiFlash { - // todo: fix when use topoFetcher - m.store.GetRegionCache().InvalidateTiFlashComputeStores() - } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { err = derr.ErrTiFlashServerTimeout @@ -275,9 +270,6 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { retry = false } else if err != nil { - if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStores() - } if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil { retry = true } @@ -340,7 +332,6 @@ func (m *mppIterator) cancelMppTasks() { Meta: &mpp.TaskMeta{StartTs: m.startTs, QueryTs: m.mppQueryID.QueryTs, LocalQueryId: m.mppQueryID.LocalQueryID, ServerId: m.mppQueryID.ServerID}, } - disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) wrappedReq.StoreTp = getEndPointType(kv.TiFlash) @@ -356,7 +347,6 @@ func (m *mppIterator) cancelMppTasks() { } // send cancel cmd to all stores where tasks run - gotErr := atomic.Bool{} wg := util.WaitGroupWrapper{} for addr := range usedStoreAddrs { storeAddr := addr @@ -365,14 +355,10 @@ func (m *mppIterator) cancelMppTasks() { logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr)) if err != nil { logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr)) - gotErr.CompareAndSwap(false, true) } }) } wg.Wait() - if gotErr.Load() && disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStores() - } } func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { @@ -388,7 +374,6 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques } var err error - disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPConn, connReq, kvrpcpb.Context{}) wrappedReq.StoreTp = getEndPointType(kv.TiFlash) @@ -399,9 +384,6 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques if err != nil { logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) - if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStores() - } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { m.sendError(derr.ErrTiFlashServerTimeout) diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index b918b7e1ca5f8..b0be440b7f0d8 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -49,6 +49,7 @@ go_library( "//util/sys/linux", "//util/sys/storage", "//util/systimemon", + "//util/tiflashcompute", "//util/topsql", "//util/versioninfo", "@com_github_coreos_go_semver//semver", diff --git a/tidb-server/main.go b/tidb-server/main.go index 17650b141e79e..0f08285aaa06b 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -20,6 +20,7 @@ import ( "fmt" "io/fs" "os" + "reflect" "runtime" "strconv" "strings" @@ -205,17 +206,19 @@ func main() { terror.MustNil(err) if config.GetGlobalConfig().DisaggregatedTiFlash { + clusterID, err := config.GetClusterName() + terror.MustNil(err) + err = tiflashcompute.InitGlobalTopoFetcher( config.GetGlobalConfig().TiFlashComputeAutoScalerType, config.GetGlobalConfig().TiFlashComputeAutoScalerAddr, - // todo: fix - "mock_cluste_id", + clusterID, config.GetGlobalConfig().IsTiFlashComputeFixedPool) - terror.MustNil(err) + terror.MustNil(err) } - // Enable failpoints in tikv/client-go if the test API is enabled. - // It appears in the main function to be set before any use of client-go to prevent data race. + // Enable failpoints in tikv/client-go if the test API is enabled. + // It appears in the main function to be set before any use of client-go to prevent data race. if _, err := failpoint.Status("github.com/pingcap/tidb/server/enableTestAPI"); err == nil { warnMsg := "tikv/client-go failpoint is enabled, this should NOT happen in the production environment" logutil.BgLogger().Warn(warnMsg) diff --git a/util/tiflashcompute/BUILD.bazel b/util/tiflashcompute/BUILD.bazel new file mode 100644 index 0000000000000..2cc2ef56c5278 --- /dev/null +++ b/util/tiflashcompute/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "tiflashcompute", + srcs = ["topo_fetcher.go"], + importpath = "github.com/pingcap/tidb/util/tiflashcompute", + visibility = ["//visibility:public"], + deps = [ + "//util/logutil", + "@com_github_pingcap_errors//:errors", + "@org_uber_go_zap//:zap", + ], +) diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index d0f1073476046..0d5c822b4c780 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -1,4 +1,4 @@ -// Copyright 2015 PingCAP, Inc. +// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -31,14 +31,17 @@ import ( var globalTopoFetcher TopoFetcher var _ TopoFetcher = &MockTopoFetcher{} var _ TopoFetcher = &AWSTopoFetcher{} +var _ TopoFetcher = &TestTopoFetcher{} const ( - // MockASStr is String value for mock AutoScaler. + // MockASStr is string value for mock AutoScaler. MockASStr = "mock" - // AWSASStr is String value for mock AutoScaler. + // AWSASStr is string value for mock AutoScaler. AWSASStr = "aws" - // GCPASStr is String value for mock AutoScaler. + // GCPASStr is string value for mock AutoScaler. GCPASStr = "gcp" + // TestASStr is string value for test AutoScaler. + TestASStr = "test" ) const ( @@ -48,6 +51,8 @@ const ( AWSASType // GCPASType is int value for mock AutoScaler. GCPASType + // TestASType is for local tidb test AutoScaler. + TestASType // InvalidASType is int value for invalid check. InvalidASType ) @@ -67,6 +72,7 @@ const ( // 1. MockAutoScaler: Normally for test, can run in local environment. // 2. AWSAutoScaler: AutoScaler runs on AWS. // 3. GCPAutoScaler: AutoScaler runs on GCP. +// 4. TestAutoScaler: AutoScaler just for unit test. type TopoFetcher interface { // Return tiflash compute topo cache, if topo is empty, will fetch topo from AutoScaler. // If topo is empty after fetch, will return error. @@ -86,29 +92,33 @@ func GetAutoScalerType(typ string) int { return AWSASType case GCPASStr: return GCPASType + case TestASStr: + return TestASType default: return InvalidASType } } // InitGlobalTopoFetcher init globalTopoFetcher if is in disaggregated-tiflash mode. It's not thread-safe. -func InitGlobalTopoFetcher(typ string, addr string, clusterID string, isFixedPool bool) error { - logutil.BgLogger().Info("globalTopoFetcher inited", zap.Any("type", typ), zap.Any("addr", addr), +func InitGlobalTopoFetcher(typ string, addr string, clusterID string, isFixedPool bool) (err error) { + logutil.BgLogger().Info("init globalTopoFetcher", zap.Any("type", typ), zap.Any("addr", addr), zap.Any("clusterID", clusterID), zap.Any("isFixedPool", isFixedPool)) ft := GetAutoScalerType(typ) switch ft { case MockASType: globalTopoFetcher = NewMockAutoScalerFetcher(addr) - return nil case AWSASType: globalTopoFetcher = NewAWSAutoScalerFetcher(addr, clusterID, isFixedPool) - return nil case GCPASType: - return errors.Errorf("topo fetch not implemented yet(%s)", typ) + err = errors.Errorf("topo fetch not implemented yet(%s)", typ) + case TestASType: + globalTopoFetcher = NewTestAutoScalerFetcher() + default: + err = errors.Errorf("unexpected topo fetch type. expect: %s or %s or %s, got %s", + MockASStr, AWSASStr, GCPASStr, typ) } - return errors.Errorf("unexpected topo fetch type. expect: %s or %s or %s, got %s", - MockASStr, AWSASStr, GCPASStr, typ) + return err } // GetGlobalTopoFetcher return global topo fetcher, not thread safe. @@ -414,3 +424,21 @@ func (f *AWSTopoFetcher) getTopo() ([]string, int64) { defer f.mu.RUnlock() return f.mu.topo, f.mu.topoTS } + +// TestTopoFetcher will return empty topo list, just for unit test. +type TestTopoFetcher struct{} + +// NewTestAutoScalerFetcher returns TestTopoFetcher. +func NewTestAutoScalerFetcher() *TestTopoFetcher { + return &TestTopoFetcher{} +} + +// AssureAndGetTopo implements TopoFetcher interface. +func (f *TestTopoFetcher) AssureAndGetTopo() ([]string, error) { + return []string{}, nil +} + +// FetchAndGetTopo implements TopoFetcher interface. +func (f *TestTopoFetcher) FetchAndGetTopo() ([]string, error) { + return []string{}, nil +} From 51916c99469486ea887b0ae63c21499bf6ffaebd Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 19 Jan 2023 18:03:32 +0800 Subject: [PATCH 07/18] fix review Signed-off-by: guo-shaoge --- config/config.go | 10 +++++----- config/config.toml.example | 3 --- store/copr/batch_coprocessor.go | 9 +++++---- store/copr/batch_request_sender.go | 16 +++++++++------- tidb-server/main.go | 1 - 5 files changed, 19 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index c8a1dbd2f1c68..fb46782a4d690 100644 --- a/config/config.go +++ b/config/config.go @@ -15,7 +15,6 @@ package config import ( - "reflect" "bytes" "encoding/base64" "encoding/json" @@ -24,6 +23,7 @@ import ( "os" "os/user" "path/filepath" + "reflect" "sort" "strings" "sync" @@ -297,7 +297,7 @@ type Config struct { TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"` TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"` - ClusterName string `toml:"cluster-name" json:"cluster-name"` + ClusterName string `toml:"cluster-name" json:"cluster-name"` // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` @@ -1012,7 +1012,7 @@ var defaultConf = Config{ TiFlashComputeAutoScalerType: tiflashcompute.DefASStr, TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr, IsTiFlashComputeFixedPool: false, - ClusterName: "", + ClusterName: "", TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, } @@ -1043,7 +1043,7 @@ func StoreGlobalConfig(config *Config) { tikvcfg.StoreGlobalConfig(&cfg) } -// GetClusterName() returns clusterName, which is KeyspaceName or ClusterName. +// GetClusterName returns clusterName, which is KeyspaceName or ClusterName. func GetClusterName() (string, error) { c := GetGlobalConfig() var keyspaceName string @@ -1355,7 +1355,7 @@ func (c *Config) Valid() error { return fmt.Errorf("stats-load-queue-size should be [%d, %d]", DefStatsLoadQueueSizeLimit, DefMaxOfStatsLoadQueueSizeLimit) } - // check tiflash_compute topo fetch is valid. + // Check tiflash_compute topo fetch is valid. if c.DisaggregatedTiFlash { if tiflashcompute.GetAutoScalerType(c.TiFlashComputeAutoScalerType) == tiflashcompute.InvalidASType { return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s", diff --git a/config/config.toml.example b/config/config.toml.example index d3642b6351bb1..9373e14c68575 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -130,9 +130,6 @@ cluster-name = "" # is-tiflashcompute-fixed-pool controls whether autoscaler use fixed shared pool(free user) or not. is-tiflashcompute-fixed-pool = false -# skip-gc-drop-table is used to control whether to skip keyspace gc drop table range in gc worker. -skip-gc-drop-table = true - [log] # Log level: debug, info, warn, error, fatal. level = "info" diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index bf3e16387f33a..15acb5521e95a 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -499,7 +499,9 @@ func buildBatchCopTasksForNonPartitionedTable( return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } -func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, +func buildBatchCopTasksForPartitionedTable( + ctx context.Context, + bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, @@ -653,7 +655,6 @@ func buildBatchCopTasksConsistentHash( break } - // todo: put to here; construct rpcCtx by self. rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr) if err != nil { return nil, err @@ -870,7 +871,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V } else { // TODO: merge the if branch. ranges := NewKeyRanges(req.KeyRanges.FirstPartitionRange()) - tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, false, 0, false, 0) + tasks, err = buildBatchCopTasksForNonPartitionedTable(ctx, bo, c.store.kvStore, ranges, req.StoreType, false, 0, false, 0) } if err != nil { @@ -1017,7 +1018,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba ranges = append(ranges, *ran) }) } - ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false, 0, false, 0) + ret, err := buildBatchCopTasksForNonPartitionedTable(ctx, bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false, 0, false, 0) return ret, err } // Retry Partition Table Scan diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index b999d43df6f45..2e2df9bd10076 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -99,13 +99,15 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx return tikverr.ErrTiDBShuttingDown } - // The reload region param is always true. Because that every time we try, we must - // re-build the range then re-create the batch sender. As a result, the len of "failStores" - // will change. If tiflash's replica is more than two, the "reload region" will always be false. - // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time - // when meeting io error. - rc := RegionCache{ss.GetRegionCache()} - rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) + if !config.GetGlobalConfig().DisaggregatedTiFlash { + // The reload region param is always true. Because that every time we try, we must + // re-build the range then re-create the batch sender. As a result, the len of "failStores" + // will change. If tiflash's replica is more than two, the "reload region" will always be false. + // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time + // when meeting io error. + rc := RegionCache{ss.GetRegionCache()} + rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) + } // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. diff --git a/tidb-server/main.go b/tidb-server/main.go index 0f08285aaa06b..55951c2745b2b 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -20,7 +20,6 @@ import ( "fmt" "io/fs" "os" - "reflect" "runtime" "strconv" "strings" From dac3d5406a55c4eaa2f85d903f051ef885988a5f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 19 Jan 2023 18:06:00 +0800 Subject: [PATCH 08/18] fix typo Signed-off-by: guo-shaoge --- config/config.toml.example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.toml.example b/config/config.toml.example index 9373e14c68575..e86c38a2b1337 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -124,7 +124,7 @@ autoscaler-type = "aws" # autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true. autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081" -# cluster-name is the name of cluster. Only meaningfule for Decicated Tier. +# cluster-name is the name of cluster. DON NOT set this if config.KeyspaceName is already set. cluster-name = "" # is-tiflashcompute-fixed-pool controls whether autoscaler use fixed shared pool(free user) or not. From 114af33e720431264c83fce6594a7113ce1ecf30 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 19 Jan 2023 18:11:57 +0800 Subject: [PATCH 09/18] fix bazel_lint Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 2 +- util/tiflashcompute/topo_fetcher.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 15acb5521e95a..131588ba9519e 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -566,7 +566,7 @@ func filterAliveStoresHelper(ctx context.Context, stores []string, ttl time.Dura mu.Lock() defer mu.Unlock() - aliveIdx = append(aliveIdx, i) + aliveIdx = append(aliveIdx, idx) }(i) } wg.Wait() diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index 0d5c822b4c780..54e483fb5aa5a 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -289,7 +289,7 @@ func NewAWSAutoScalerFetcher(addr string, clusterID string, isFixed bool) *AWSTo } // AssureAndGetTopo implements TopoFetcher interface. -func (f *AWSTopoFetcher) AssureAndGetTopo() ([]string, error) { +func (*AWSTopoFetcher) AssureAndGetTopo() ([]string, error) { return nil, errors.New("AWSTopoFetcher AssureAndGetTopo not implemented") } @@ -434,11 +434,11 @@ func NewTestAutoScalerFetcher() *TestTopoFetcher { } // AssureAndGetTopo implements TopoFetcher interface. -func (f *TestTopoFetcher) AssureAndGetTopo() ([]string, error) { +func (*TestTopoFetcher) AssureAndGetTopo() ([]string, error) { return []string{}, nil } // FetchAndGetTopo implements TopoFetcher interface. -func (f *TestTopoFetcher) FetchAndGetTopo() ([]string, error) { +func (*TestTopoFetcher) FetchAndGetTopo() ([]string, error) { return []string{}, nil } From d280e1754e77c4fa01597fae84ba4e5a254bd746 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 20 Jan 2023 13:35:32 +0800 Subject: [PATCH 10/18] refine topo fetcher err msg(cse:70feb371993e38ed92556ce602793b5a53e32cc3) Signed-off-by: guo-shaoge --- executor/tiflashtest/tiflash_test.go | 11 +++-- util/tiflashcompute/BUILD.bazel | 2 + util/tiflashcompute/topo_fetcher.go | 65 +++++++++++++++++----------- 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index 641532509e322..a05ba7efa967c 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1259,6 +1259,9 @@ func TestDisaggregatedTiFlash(t *testing.T) { config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = true }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) err := tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.TestASStr, "", "", false) require.NoError(t, err) @@ -1276,10 +1279,10 @@ func TestDisaggregatedTiFlash(t *testing.T) { err = tk.ExecToErr("select * from t;") require.Contains(t, err.Error(), "Cannot find proper topo from AutoScaler") - config.UpdateGlobal(func(conf *config.Config) { - conf.DisaggregatedTiFlash = false - }) - tk.MustQuery("select * from t;").Check(testkit.Rows()) + err = tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.AWSASStr, "", "", false) + require.NoError(t, err) + err = tk.ExecToErr("select * from t;") + require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed") } func TestDisaggregatedTiFlashQuery(t *testing.T) { diff --git a/util/tiflashcompute/BUILD.bazel b/util/tiflashcompute/BUILD.bazel index 2cc2ef56c5278..1d897962e04af 100644 --- a/util/tiflashcompute/BUILD.bazel +++ b/util/tiflashcompute/BUILD.bazel @@ -6,6 +6,8 @@ go_library( importpath = "github.com/pingcap/tidb/util/tiflashcompute", visibility = ["//visibility:public"], deps = [ + "//errno", + "//util/dbterror", "//util/logutil", "@com_github_pingcap_errors//:errors", "@org_uber_go_zap//:zap", diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index 54e483fb5aa5a..bc8bee616b512 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -23,8 +23,10 @@ import ( "strings" "sync" + "github.com/pingcap/tidb/errno" "github.com/pingcap/errors" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/dbterror" "go.uber.org/zap" ) @@ -36,9 +38,9 @@ var _ TopoFetcher = &TestTopoFetcher{} const ( // MockASStr is string value for mock AutoScaler. MockASStr = "mock" - // AWSASStr is string value for mock AutoScaler. + // AWSASStr is string value for aws AutoScaler. AWSASStr = "aws" - // GCPASStr is string value for mock AutoScaler. + // GCPASStr is string value for gcp AutoScaler. GCPASStr = "gcp" // TestASStr is string value for test AutoScaler. TestASStr = "test" @@ -47,9 +49,9 @@ const ( const ( // MockASType is int value for mock AutoScaler. MockASType int = iota - // AWSASType is int value for mock AutoScaler. + // AWSASType is int value for aws AutoScaler. AWSASType - // GCPASType is int value for mock AutoScaler. + // GCPASType is int value for gcp AutoScaler. GCPASType // TestASType is for local tidb test AutoScaler. TestASType @@ -67,6 +69,12 @@ const ( awsFetchHTTPPath = "resume-and-get-topology" ) +const ( + httpGetFailedErrMsg = "get tiflash_compute topology failed" + parseTopoTSFailedErrMsg = "parse timestamp of tiflash_compute topology failed" +) +var errTopoFetcher = dbterror.ClassUtil.NewStd(errno.ErrInternal) + // TopoFetcher is interface for fetching topo from AutoScaler. // There are three kinds of AutoScaler for now: // 1. MockAutoScaler: Normally for test, can run in local environment. @@ -197,7 +205,7 @@ func (f *MockTopoFetcher) assureTopo(nodeNum int) error { url := u.String() logutil.BgLogger().Info("assureTopo", zap.Any("url", url)) - newTopo, err := httpGetAndParseResp(url) + newTopo, err := mockHTTPGetAndParseResp(url) if err != nil { return err } @@ -218,7 +226,7 @@ func (f *MockTopoFetcher) fetchTopo() error { url := u.String() logutil.BgLogger().Info("fetchTopo", zap.Any("url", url)) - newTopo, err := httpGetAndParseResp(url) + newTopo, err := mockHTTPGetAndParseResp(url) if err != nil { return err } @@ -229,24 +237,38 @@ func (f *MockTopoFetcher) fetchTopo() error { return nil } -// httpGetAndParseResp send http get request and parse topo to []string. -func httpGetAndParseResp(url string) ([]string, error) { +func httpGetAndParseResp(url string) ([]byte, error) { resp, err := http.Get(url) if err != nil { - return nil, errors.Trace(err) + logutil.BgLogger().Error(err.Error()) + return nil, errTopoFetcher.GenWithStackByArgs(httpGetFailedErrMsg) } defer resp.Body.Close() b, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, errors.Trace(err) + logutil.BgLogger().Error(err.Error()) + return nil, errTopoFetcher.GenWithStackByArgs(httpGetFailedErrMsg) } bStr := string(b) if resp.StatusCode != http.StatusOK { - return nil, errors.Errorf("http get mock AutoScaler failed. url: %s, status code: %s, http resp body: %s", url, http.StatusText(resp.StatusCode), bStr) + logutil.BgLogger().Error("http get mock AutoScaler failed", zap.Any("url", url), + zap.Any("status code", http.StatusText(resp.StatusCode)), + zap.Any("http body", bStr)) + return nil, errTopoFetcher.GenWithStackByArgs(httpGetFailedErrMsg) + } + return b, nil +} + +// httpGetAndParseResp send http get request and parse topo to []string. +func mockHTTPGetAndParseResp(url string) ([]string, error) { + b, err := httpGetAndParseResp(url) + if err != nil { + return nil, err } // For MockAutoScaler, topo is like: ip:port;ip:port. + bStr := string(b) newTopo := strings.Split(bStr, ";") if len(bStr) == 0 || len(newTopo) == 0 { return nil, errors.New("topo list is empty") @@ -322,27 +344,18 @@ func (f *AWSTopoFetcher) FetchAndGetTopo() (curTopo []string, err error) { } func awsHTTPGetAndParseResp(url string) (*resumeAndGetTopologyResult, error) { - resp, err := http.Get(url) - if err != nil { - return nil, errors.Trace(err) - } - defer resp.Body.Close() - - b, err := ioutil.ReadAll(resp.Body) + b, err := httpGetAndParseResp(url) if err != nil { - return nil, errors.Trace(err) - } - bStr := string(b) - if resp.StatusCode != http.StatusOK { - return nil, errors.Errorf("http get mock AutoScaler failed. url: %s, status code: %s, http resp body: %s", url, http.StatusText(resp.StatusCode), bStr) + return nil, err } res := &resumeAndGetTopologyResult{} if err = json.Unmarshal(b, &res); err != nil { - return nil, errors.Trace(err) + logutil.BgLogger().Error(err.Error()) + return nil, errTopoFetcher.GenWithStackByArgs(httpGetFailedErrMsg) } - logutil.BgLogger().Debug("awsHTTPGetAndParseResp succeed", zap.Any("resp", res)) + logutil.BgLogger().Info("awsHTTPGetAndParseResp succeed", zap.Any("resp", res)) return res, nil } @@ -355,7 +368,7 @@ func (f *AWSTopoFetcher) tryUpdateTopo(newTopo *resumeAndGetTopologyResult) (upd zap.Any("fetch TS", newTopo.Timestamp), zap.Any("converted TS", newTS), zap.Any("fetch topo", newTopo.Topology)) }() if err != nil { - return updated, errors.Errorf("parse newTopo.timestamp failed when update topo: %s", err.Error()) + return updated, errTopoFetcher.GenWithStackByArgs(parseTopoTSFailedErrMsg) } if cachedTS >= newTS { From 7c132a32c879bb735c19c7114a0fc20e5143a4b8 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 20 Jan 2023 13:37:14 +0800 Subject: [PATCH 11/18] fix fmt Signed-off-by: guo-shaoge --- util/tiflashcompute/topo_fetcher.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index bc8bee616b512..d98e5d9cc08bc 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -23,10 +23,10 @@ import ( "strings" "sync" - "github.com/pingcap/tidb/errno" "github.com/pingcap/errors" - "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -70,9 +70,10 @@ const ( ) const ( - httpGetFailedErrMsg = "get tiflash_compute topology failed" + httpGetFailedErrMsg = "get tiflash_compute topology failed" parseTopoTSFailedErrMsg = "parse timestamp of tiflash_compute topology failed" ) + var errTopoFetcher = dbterror.ClassUtil.NewStd(errno.ErrInternal) // TopoFetcher is interface for fetching topo from AutoScaler. From 61700ded8ec33db45b184f73c2ffd17eb940c754 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sat, 28 Jan 2023 14:32:46 +0800 Subject: [PATCH 12/18] remove using reflect Signed-off-by: guo-shaoge --- config/config.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/config/config.go b/config/config.go index fb46782a4d690..d1942988438f0 100644 --- a/config/config.go +++ b/config/config.go @@ -23,7 +23,6 @@ import ( "os" "os/user" "path/filepath" - "reflect" "sort" "strings" "sync" @@ -95,8 +94,6 @@ const ( DefAuthTokenRefreshInterval = time.Hour // EnvVarKeyspaceName is the system env name for keyspace name. EnvVarKeyspaceName = "KEYSPACE_NAME" - // ConfigKeyspaceFieldName is the struct field name Config.KeyspaceName. - ConfigKeyspaceFieldName = "KeyspaceName" ) // Valid config maps @@ -1046,17 +1043,9 @@ func StoreGlobalConfig(config *Config) { // GetClusterName returns clusterName, which is KeyspaceName or ClusterName. func GetClusterName() (string, error) { c := GetGlobalConfig() - var keyspaceName string + keyspaceName := c.KeyspaceName clusterName := c.ClusterName - // TODO: Delete using reflect when keyspace code is merged. - v := reflect.ValueOf(c).FieldByName(ConfigKeyspaceFieldName) - if v.IsValid() { - if v.Kind() != reflect.String { - terror.MustNil(errors.New("config.KeyspaceName should be String type")) - } - keyspaceName = v.String() - } if keyspaceName != "" && clusterName != "" { return "", errors.Errorf("config.KeyspaceName(%s) and config.ClusterName(%s) are not empty both", keyspaceName, clusterName) } From 1734140f70ba9c92f4f1fe6f9cf099a21c84de11 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sat, 28 Jan 2023 14:42:39 +0800 Subject: [PATCH 13/18] update TestASType related code Signed-off-by: guo-shaoge --- config/config.go | 2 +- util/tiflashcompute/topo_fetcher.go | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index d1942988438f0..61023327edfe7 100644 --- a/config/config.go +++ b/config/config.go @@ -1346,7 +1346,7 @@ func (c *Config) Valid() error { // Check tiflash_compute topo fetch is valid. if c.DisaggregatedTiFlash { - if tiflashcompute.GetAutoScalerType(c.TiFlashComputeAutoScalerType) == tiflashcompute.InvalidASType { + if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) { return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s", tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType) } diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index d98e5d9cc08bc..15bc535eb00f1 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -92,8 +92,14 @@ type TopoFetcher interface { FetchAndGetTopo() ([]string, error) } -// GetAutoScalerType return topo fetcher type. -func GetAutoScalerType(typ string) int { +// IsValidAutoScalerConfig return true if user config of autoscaler type is valid. +func IsValidAutoScalerConfig(typ string) bool { + t := getAutoScalerType(typ) + return t == MockASType || t == AWSASType || t == GCPASType +} + +// getAutoScalerType return topo fetcher type. +func getAutoScalerType(typ string) int { switch typ { case MockASStr: return MockASType @@ -113,7 +119,7 @@ func InitGlobalTopoFetcher(typ string, addr string, clusterID string, isFixedPoo logutil.BgLogger().Info("init globalTopoFetcher", zap.Any("type", typ), zap.Any("addr", addr), zap.Any("clusterID", clusterID), zap.Any("isFixedPool", isFixedPool)) - ft := GetAutoScalerType(typ) + ft := getAutoScalerType(typ) switch ft { case MockASType: globalTopoFetcher = NewMockAutoScalerFetcher(addr) From dbcad3a0ca2c8941b621ec3ebb6a9664e2dd0699 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sat, 28 Jan 2023 14:59:19 +0800 Subject: [PATCH 14/18] update batch of trivial fix Signed-off-by: guo-shaoge --- config/config.go | 4 ++-- store/copr/batch_coprocessor_test.go | 2 ++ util/tiflashcompute/topo_fetcher.go | 7 ++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index 61023327edfe7..be8ee797b0025 100644 --- a/config/config.go +++ b/config/config.go @@ -289,7 +289,7 @@ type Config struct { MaxServerConnections uint32 `toml:"max-server-connections" json:"max-server-connections"` RunDDL bool `toml:"run-ddl" json:"run-ddl"` - // These config is related to disaggregated-tiflash mode. + // These configs are related to disaggregated-tiflash mode. DisaggregatedTiFlash bool `toml:"disaggregated-tiflash" json:"disaggregated-tiflash"` TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"` TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` @@ -1040,7 +1040,7 @@ func StoreGlobalConfig(config *Config) { tikvcfg.StoreGlobalConfig(&cfg) } -// GetClusterName returns clusterName, which is KeyspaceName or ClusterName. +// GetClusterName returns KeyspaceName or ClusterName. func GetClusterName() (string, error) { c := GetGlobalConfig() keyspaceName := c.KeyspaceName diff --git a/store/copr/batch_coprocessor_test.go b/store/copr/batch_coprocessor_test.go index b6d8bd5aeb84a..23f3b0cad8002 100644 --- a/store/copr/batch_coprocessor_test.go +++ b/store/copr/batch_coprocessor_test.go @@ -226,4 +226,6 @@ func TestTopoFetcherBackoff(t *testing.T) { dura := time.Since(start) // fetchTopoMaxBackoff is milliseconds. require.GreaterOrEqual(t, dura, time.Duration(fetchTopoMaxBackoff*1000)) + require.GreaterOrEqual(t, dura, 30*time.Second) + require.LessOrEqual(t, dura, 50*time.Second) } diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index 15bc535eb00f1..cfed30a67e125 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -65,11 +65,8 @@ const ( // DefASStr default AutoScaler. DefASStr = AWSASStr - awsFixedPoolHTTPPath = "sharedfixedpool" - awsFetchHTTPPath = "resume-and-get-topology" -) - -const ( + awsFixedPoolHTTPPath = "sharedfixedpool" + awsFetchHTTPPath = "resume-and-get-topology" httpGetFailedErrMsg = "get tiflash_compute topology failed" parseTopoTSFailedErrMsg = "parse timestamp of tiflash_compute topology failed" ) From 338229bb08895c83cd1442eeafdb1d5b1aeb5d68 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 31 Jan 2023 16:05:25 +0800 Subject: [PATCH 15/18] change config name: cluster-name -> autoscaler-cluster-id Signed-off-by: guo-shaoge --- config/config.go | 20 ++++++++++---------- config/config.toml.example | 4 ++-- tidb-server/main.go | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index dd846c76a3dc3..b0a7b4498a05b 100644 --- a/config/config.go +++ b/config/config.go @@ -294,7 +294,7 @@ type Config struct { TiFlashComputeAutoScalerType string `toml:"autoscaler-type" json:"autoscaler-type"` TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"` IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"` - ClusterName string `toml:"cluster-name" json:"cluster-name"` + AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"` // TiDBMaxReuseChunk indicates max cached chunk num TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"` @@ -1011,7 +1011,7 @@ var defaultConf = Config{ TiFlashComputeAutoScalerType: tiflashcompute.DefASStr, TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr, IsTiFlashComputeFixedPool: false, - ClusterName: "", + AutoScalerClusterID: "", TiDBMaxReuseChunk: 64, TiDBMaxReuseColumn: 256, TiDBEnableExitCheck: false, @@ -1043,22 +1043,22 @@ func StoreGlobalConfig(config *Config) { tikvcfg.StoreGlobalConfig(&cfg) } -// GetClusterName returns KeyspaceName or ClusterName. -func GetClusterName() (string, error) { +// GetAutoScalerClusterID returns KeyspaceName or AutoScalerClusterID. +func GetAutoScalerClusterID() (string, error) { c := GetGlobalConfig() keyspaceName := c.KeyspaceName - clusterName := c.ClusterName + clusterID := c.AutoScalerClusterID - if keyspaceName != "" && clusterName != "" { - return "", errors.Errorf("config.KeyspaceName(%s) and config.ClusterName(%s) are not empty both", keyspaceName, clusterName) + if keyspaceName != "" && clusterID != "" { + return "", errors.Errorf("config.KeyspaceName(%s) and config.AutoScalerClusterID(%s) are not empty both", keyspaceName, clusterID) } - if keyspaceName == "" && clusterName == "" { - return "", errors.Errorf("config.KeyspaceName and config.ClusterName are both empty") + if keyspaceName == "" && clusterID == "" { + return "", errors.Errorf("config.KeyspaceName and config.AutoScalerClusterID are both empty") } res := keyspaceName if res == "" { - res = clusterName + res = clusterID } return res, nil } diff --git a/config/config.toml.example b/config/config.toml.example index e86c38a2b1337..76d6c62430319 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -124,8 +124,8 @@ autoscaler-type = "aws" # autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true. autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081" -# cluster-name is the name of cluster. DON NOT set this if config.KeyspaceName is already set. -cluster-name = "" +# autoscaler-cluster-id is the unique id for each TiDB cluster, which will used by AutoScaler. +autoscaler-cluster-id = "" # is-tiflashcompute-fixed-pool controls whether autoscaler use fixed shared pool(free user) or not. is-tiflashcompute-fixed-pool = false diff --git a/tidb-server/main.go b/tidb-server/main.go index b9b415a77fe7a..f3ca17cf7c4c6 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -205,7 +205,7 @@ func main() { terror.MustNil(err) if config.GetGlobalConfig().DisaggregatedTiFlash { - clusterID, err := config.GetClusterName() + clusterID, err := config.GetAutoScalerClusterID() terror.MustNil(err) err = tiflashcompute.InitGlobalTopoFetcher( From dfaf69c2ac3b710169297864a60ddfe891a2de62 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 31 Jan 2023 16:19:30 +0800 Subject: [PATCH 16/18] update config.toml.example Signed-off-by: guo-shaoge --- config/config.toml.example | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/config/config.toml.example b/config/config.toml.example index 76d6c62430319..d153f56fe7690 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -118,18 +118,17 @@ enable-global-kill = true disaggregated-tiflash = false # autoscaler-type indicates which type of AutoScaler will be used. Possible values are: mock, aws, gcp. -# Only meaningful when disaggregated-tiflash is true +# Only meaningful when disaggregated-tiflash is true. autoscaler-type = "aws" # autoscaler-addr is the host of AutoScaler, Only meaningful when disaggregated-tiflash is true. +# Only meaningful when disaggregated-tiflash is true. autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081" # autoscaler-cluster-id is the unique id for each TiDB cluster, which will used by AutoScaler. +# Only meaningful when disaggregated-tiflash is true. autoscaler-cluster-id = "" -# is-tiflashcompute-fixed-pool controls whether autoscaler use fixed shared pool(free user) or not. -is-tiflashcompute-fixed-pool = false - [log] # Log level: debug, info, warn, error, fatal. level = "info" From ed0cf4c0c1bcbb9a3809f67dda9d924317367964 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 1 Feb 2023 14:42:03 +0800 Subject: [PATCH 17/18] fix unit-test Signed-off-by: guo-shaoge --- executor/tiflashtest/tiflash_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index 701de4e5c4a89..531ca5fe41329 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1277,11 +1277,13 @@ func TestDisaggregatedTiFlash(t *testing.T) { tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") err = tk.ExecToErr("select * from t;") - require.Contains(t, err.Error(), "tiflash_compute node is unavailable") + // Expect error, because TestAutoScaler return empty topo. + require.Contains(t, err.Error(), "Cannot find proper topo from AutoScaler") err = tiflashcompute.InitGlobalTopoFetcher(tiflashcompute.AWSASStr, "", "", false) require.NoError(t, err) err = tk.ExecToErr("select * from t;") + // Expect error, because AWSAutoScaler is not setup, so http request will fail. require.Contains(t, err.Error(), "[util:1815]Internal : get tiflash_compute topology failed") } From 582ba44c42e6c4a4e057fad49a5975be7be46f25 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 1 Feb 2023 17:06:24 +0800 Subject: [PATCH 18/18] fix comment Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 8 -------- util/tiflashcompute/topo_fetcher.go | 4 ++-- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 93507bc1ca012..568b4dc940c3c 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -523,14 +523,6 @@ func buildBatchCopTasksForPartitionedTable( return batchTasks, nil } -func filterAliveStoresStr(ctx context.Context, storesStr []string, ttl time.Duration, kvStore *kvStore) (aliveStores []string) { - aliveIdx := filterAliveStoresHelper(ctx, storesStr, ttl, kvStore) - for _, idx := range aliveIdx { - aliveStores = append(aliveStores, storesStr[idx]) - } - return aliveStores -} - func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) (aliveStores []*tikv.Store) { storesStr := make([]string, 0, len(stores)) for _, s := range stores { diff --git a/util/tiflashcompute/topo_fetcher.go b/util/tiflashcompute/topo_fetcher.go index cfed30a67e125..f8d7e7b123e63 100644 --- a/util/tiflashcompute/topo_fetcher.go +++ b/util/tiflashcompute/topo_fetcher.go @@ -74,7 +74,7 @@ const ( var errTopoFetcher = dbterror.ClassUtil.NewStd(errno.ErrInternal) // TopoFetcher is interface for fetching topo from AutoScaler. -// There are three kinds of AutoScaler for now: +// We support the following kinds of AutoScaler for now: // 1. MockAutoScaler: Normally for test, can run in local environment. // 2. AWSAutoScaler: AutoScaler runs on AWS. // 3. GCPAutoScaler: AutoScaler runs on GCP. @@ -264,7 +264,7 @@ func httpGetAndParseResp(url string) ([]byte, error) { return b, nil } -// httpGetAndParseResp send http get request and parse topo to []string. +// mockHTTPGetAndParseResp send http get request and parse topo to []string. func mockHTTPGetAndParseResp(url string) ([]string, error) { b, err := httpGetAndParseResp(url) if err != nil {