Skip to content

Commit

Permalink
planner, session: add isolation read with engine type (#12997)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 committed Nov 1, 2019
1 parent ff6a4f7 commit 000c1ba
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 4 deletions.
8 changes: 8 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ const (
TiFlash
)

// Name returns the name of store type.
func (t StoreType) Name() string {
if t == TiFlash {
return "tiflash"
}
return "tikv"
}

// Request represents a kv request.
type Request struct {
// Tp is the request type.
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,9 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, indent string, isLast
var storeType string
switch x.StoreType {
case kv.TiKV:
storeType = "tikv"
storeType = kv.TiKV.Name()
case kv.TiFlash:
storeType = "tiflash"
storeType = kv.TiFlash.Name()
default:
err = errors.Errorf("the store type %v is unknown", x.StoreType)
return
Expand Down
17 changes: 17 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,20 @@ func (s *testIntegrationSuite) TestSimplifyOuterJoinWithCast(c *C) {
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSuite) TestNoneAccessPathsFoundByIsolationRead(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")

_, err := tk.Exec("select * from t")
c.Assert(err, IsNil)

tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")

_, err = tk.Exec("select * from t")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can not find access path matching 'tidb_isolation_read_engines'(value: 'tiflash'). Available values are 'tikv'.")
}
4 changes: 4 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,6 +2438,10 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if err != nil {
return nil, err
}
possiblePaths, err = b.filterPathByIsolationRead(possiblePaths)
if err != nil {
return nil, err
}

var columns []*table.Column
if b.inUpdateStmt {
Expand Down
27 changes: 27 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -662,6 +663,32 @@ func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tblInf
return available, nil
}

func (b *PlanBuilder) filterPathByIsolationRead(paths []*accessPath) ([]*accessPath, error) {
// TODO: filter paths with isolation read locations.
isolationReadEngines := b.ctx.GetSessionVars().GetIsolationReadEngines()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
for i := len(paths) - 1; i >= 0; i-- {
if _, ok := availableEngine[paths[i].storeType]; !ok {
availableEngine[paths[i].storeType] = struct{}{}
if availableEngineStr != "" {
availableEngineStr += ", "
}
availableEngineStr += paths[i].storeType.Name()
}
if _, ok := isolationReadEngines[paths[i].storeType]; !ok {
paths = append(paths[:i], paths[i+1:]...)
}
}
var err error
if len(paths) == 0 {
engineVals, _ := b.ctx.GetSessionVars().GetSystemVar(variable.TiDBIsolationReadEngines)
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("Can not find access path matching '%v'(value: '%v'). Available values are '%v'.",
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr))
}
return paths, err
}

func removeIgnoredPaths(paths, ignoredPaths []*accessPath, tblInfo *model.TableInfo) []*accessPath {
if len(ignoredPaths) == 0 {
return paths
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1838,6 +1838,7 @@ var builtinGlobalVariable = []string{
variable.TiDBMaxDeltaSchemaCount,
variable.TiDBCapturePlanBaseline,
variable.TiDBUsePlanBaselines,
variable.TiDBIsolationReadEngines,
}

var (
Expand Down
15 changes: 15 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,21 @@ func (s *testSessionSuite) TestReplicaRead(c *C) {
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}

func (s *testSessionSuite) TestIsolationRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(len(tk.Se.GetSessionVars().GetIsolationReadEngines()), Equals, 2)
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';")
engines := tk.Se.GetSessionVars().GetIsolationReadEngines()
c.Assert(len(engines), Equals, 1)
_, hasTiFlash := engines[kv.TiFlash]
_, hasTiKV := engines[kv.TiKV]
c.Assert(hasTiFlash, Equals, true)
c.Assert(hasTiKV, Equals, false)
}

func (s *testSessionSuite) TestStmtHints(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ type SessionVars struct {
// replicaRead is used for reading data from replicas, only follower is supported at this time.
replicaRead kv.ReplicaReadType

// isolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
isolationReadEngines map[kv.StoreType]struct{}

PlannerSelectBlockAsName []ast.HintTable
}

Expand Down Expand Up @@ -520,6 +523,7 @@ func NewSessionVars() *SessionVars {
replicaRead: kv.ReplicaReadLeader,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
UsePlanBaselines: DefTiDBUsePlanBaselines,
isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}},
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -616,6 +620,11 @@ func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// GetIsolationReadEngines gets isolation read engines.
func (s *SessionVars) GetIsolationReadEngines() map[kv.StoreType]struct{} {
return s.isolationReadEngines
}

// CleanBuffers cleans the temporary bufs
func (s *SessionVars) CleanBuffers() {
s.GetWriteStmtBufs().clean()
Expand Down Expand Up @@ -956,6 +965,16 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
case TiDBUsePlanBaselines:
s.UsePlanBaselines = TiDBOptOn(val)
case TiDBIsolationReadEngines:
s.isolationReadEngines = make(map[kv.StoreType]struct{})
for _, engine := range strings.Split(val, ",") {
switch engine {
case kv.TiKV.Name():
s.isolationReadEngines[kv.TiKV] = struct{}{}
case kv.TiFlash.Name():
s.isolationReadEngines[kv.TiFlash] = struct{}{}
}
}
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
{ScopeGlobal | ScopeSession, TiDBCapturePlanBaseline, "0"},
{ScopeGlobal | ScopeSession, TiDBUsePlanBaselines, BoolToIntStr(DefTiDBUsePlanBaselines)},
{ScopeGlobal | ScopeSession, TiDBIsolationReadEngines, "tikv,tiflash"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ const (

// TiDBUsePlanBaselines indicates whether the use of plan baselines is enabled.
TiDBUsePlanBaselines = "tidb_use_plan_baselines"

// TiDBIsolationReadEngines indicates the tidb only read from the stores whose engine type is involved in IsolationReadEngines.
// Now, only support TiKV and TiFlash.
TiDBIsolationReadEngines = "tidb_isolation_read_engines"
)

// Default TiDB system variable values.
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/timeutil"
)
Expand Down Expand Up @@ -625,6 +626,24 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBIsolationReadEngines:
engines := strings.Split(value, ",")
var formatVal string
for i, engine := range engines {
engine = strings.TrimSpace(engine)
if i != 0 {
formatVal += ","
}
switch {
case strings.EqualFold(engine, kv.TiKV.Name()):
formatVal += kv.TiKV.Name()
case strings.EqualFold(engine, kv.TiFlash.Name()):
formatVal += kv.TiFlash.Name()
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
}
return formatVal, nil
}
return value, nil
}
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBTxnMode, "pessimistic", false},
{TiDBTxnMode, "optimistic", false},
{TiDBTxnMode, "", false},
{TiDBIsolationReadEngines, "", true},
{TiDBIsolationReadEngines, "tikv", false},
{TiDBIsolationReadEngines, "TiKV,tiflash", false},
{TiDBIsolationReadEngines, " tikv, tiflash ", false},
}

for _, t := range tests {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
s.storeType = kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
if label.Value == kv.TiFlash.Name() {
s.storeType = kv.TiFlash
}
break
Expand Down Expand Up @@ -1265,7 +1265,7 @@ func (s *Store) reResolve(c *RegionCache) {
storeType := kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
if label.Value == kv.TiFlash.Name() {
storeType = kv.TiFlash
}
break
Expand Down

0 comments on commit 000c1ba

Please sign in to comment.