Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: make region request can send to flash store #11652

Merged
merged 18 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
OpenTracing OpenTracing `toml:"opentracing" json:"opentracing"`
ProxyProtocol ProxyProtocol `toml:"proxy-protocol" json:"proxy-protocol"`
TiKVClient TiKVClient `toml:"tikv-client" json:"tikv-client"`
TiFlash TiFlash `toml:"tiflash" json:"tiflash"`
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Plugin Plugin `toml:"plugin" json:"plugin"`
Expand Down Expand Up @@ -317,6 +318,13 @@ type PessimisticTxn struct {
TTL string `toml:"ttl" json:"ttl"`
}

// TiFlash is the config for TiFlash.
type TiFlash struct {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
// LabelKey and LabelValue are used to check whether a store is TiFlash.
LabelKey string `toml:"label-key" json:"label-key"`
LabelValue string `toml:"label-value" json:"label-value"`
}

var defaultConf = Config{
Host: "0.0.0.0",
AdvertiseAddress: "",
Expand Down Expand Up @@ -402,6 +410,10 @@ var defaultConf = Config{
MaxBatchWaitTime: 0,
BatchWaitSize: 8,
},
TiFlash: TiFlash{
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
LabelKey: "zone",
LabelValue: "engine",
},
Binlog: Binlog{
WriteTimeout: "15s",
Strategy: "range",
Expand Down
5 changes: 5 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ txn-total-size-limit=2000
[tikv-client]
commit-timeout="41s"
max-batch-size=128
[tiflash]
label-key="zone"
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
label-value="engine"
`)

c.Assert(err, IsNil)
Expand All @@ -92,6 +95,8 @@ max-batch-size=128
c.Assert(conf.EnableTableLock, IsTrue)
c.Assert(conf.DelayCleanTableLock, Equals, uint64(5))
c.Assert(conf.SplitRegionMaxNum, Equals, uint64(10000))
c.Assert(conf.TiFlash.LabelKey, Equals, "zone")
c.Assert(conf.TiFlash.LabelValue, Equals, "engine")
c.Assert(f.Close(), IsNil)
c.Assert(os.Remove(configFile), IsNil)

Expand Down
9 changes: 5 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ type copTask struct {
region RegionVerID
ranges *copRanges

respChan chan *copResponse
storeAddr string
cmdType tikvrpc.CmdType
respChan chan *copResponse
storeAddr string
cmdType tikvrpc.CmdType
isFlashTask bool
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -624,7 +625,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
ScanDetail: true,
})
startTime := time.Now()
resp, rpcCtx, err := sender.SendReqCtx(bo, req, task.region, ReadTimeoutMedium)
resp, rpcCtx, err := sender.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.isFlashTask)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
67 changes: 66 additions & 1 deletion store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -300,6 +301,55 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext,
}, nil
}

// GetFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region
// must be out of date and already dropped from cache or not flash store found.
func (c *RegionCache) GetFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) {
ts := time.Now().Unix()

cachedRegion := c.getCachedRegionWithRLock(id)
if cachedRegion == nil {
return nil, nil
}
if !cachedRegion.checkRegionCacheTTL(ts) {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

regionStore := cachedRegion.getStore()

for i, store := range regionStore.stores {
Copy link
Contributor

@lysu lysu Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the situation that 1 region have multiple TiFlash?

if so we should take care failure in one of TiFlash store and loadbalance between multiple TiFlash node.

for example, region1 has 3 TiFlash node: a, b, c.

we should give chance to use b and c; and send request to b and c when a is failured?

if !store.isFlash {
continue
}
peer, storeIdx := cachedRegion.meta.Peers[i], i
addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx)
if err != nil {
return nil, err
}
if store == nil || len(addr) == 0 {
cachedRegion.invalidate()
return nil, nil
}
storeFailEpoch := atomic.LoadUint32(&store.fail)
if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] {
cachedRegion.invalidate()
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
return nil, nil
}
return &RPCContext{
Region: id,
Meta: cachedRegion.meta,
Peer: peer,
PeerIdx: storeIdx,
Store: store,
Addr: addr,
}, nil
}

return nil, nil
}

// KeyLocation is the region and range that a key is located.
type KeyLocation struct {
Region RegionVerID
Expand Down Expand Up @@ -1029,6 +1079,7 @@ type Store struct {
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
isFlash bool // is the store theflash
}

type resolveState uint64
Expand All @@ -1050,6 +1101,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
return
}
var store *metapb.Store
confTiFlash := config.GetGlobalConfig().TiFlash
for {
store, err = c.pdClient.GetStore(bo.ctx, s.storeID)
if err != nil {
Expand All @@ -1073,6 +1125,12 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
}
addr = store.GetAddress()
s.addr = addr
for _, label := range store.Labels {
if label.Key == confTiFlash.LabelKey && label.Value == confTiFlash.LabelValue {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
s.isFlash = true
break
}
}
retry:
state = s.getResolveState()
if state != unresolved {
Expand Down Expand Up @@ -1109,10 +1167,17 @@ func (s *Store) reResolve(c *RegionCache) {
return
}

confTiFlash := config.GetGlobalConfig().TiFlash
isFlash := false
for _, label := range store.Labels {
if label.Key == confTiFlash.LabelKey && label.Value == confTiFlash.LabelValue {
isFlash = true
}
}
addr = store.GetAddress()
if s.addr != addr {
state := resolved
newStore := &Store{storeID: s.storeID, addr: addr}
newStore := &Store{storeID: s.storeID, addr: addr, isFlash: isFlash}
newStore.state = *(*uint64)(unsafe.Pointer(&state))
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
Expand Down
23 changes: 18 additions & 5 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ package tikv

import (
"context"
"sync/atomic"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -69,12 +70,18 @@ func NewRegionRequestSender(regionCache *RegionCache, client Client) *RegionRequ

// SendReq sends a request to tikv server.
func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
resp, _, err := s.SendReqCtx(bo, req, regionID, timeout)
resp, _, err := s.SendReqCtx(bo, req, regionID, timeout, false)
return resp, err
}

// SendReqToFlash sends a request to tiflash server.
func (s *RegionRequestSender) SendReqToFlash(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
resp, _, err := s.SendReqCtx(bo, req, regionID, timeout, true)
return resp, err
}

// SendReqCtx sends a request to tikv server and return response and RPCCtx of this RPC.
func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, *RPCContext, error) {
func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration, sendToFlash bool) (*tikvrpc.Response, *RPCContext, error) {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
failpoint.Inject("tikvStoreSendReqResult", func(val failpoint.Value) {
switch val.(string) {
case "timeout":
Expand All @@ -95,7 +102,13 @@ func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, re
})

for {
ctx, err := s.regionCache.GetRPCContext(bo, regionID)
var ctx *RPCContext
var err error
if sendToFlash {
ctx, err = s.regionCache.GetFlashRPCContext(bo, regionID)
} else {
ctx, err = s.regionCache.GetRPCContext(bo, regionID)
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
}
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {
region, err := s.cache.LocateRegionByID(s.bo, s.region)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, false)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
Expand Down