Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add admin cleanup table lock syntax support #10423

Merged
merged 12 commits into from
Jul 10, 2019
16 changes: 16 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3172,6 +3172,22 @@ func (s *testDBSuite2) TestLockTables(c *C) {
_, err = tk2.Exec("alter database test charset='utf8mb4'")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)

// Test for admin cleanup table locks.
tk.MustExec("unlock tables")
tk.MustExec("lock table t1 write, t2 write")
_, err = tk2.Exec("lock tables t1 write, t2 read")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue)
tk2.MustExec("admin cleanup table lock t1,t2")
checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone)
checkTableLock(c, tk.Se, "test", "t2", model.TableLockNone)
// cleanup unlocked table.
tk2.MustExec("admin cleanup table lock t1,t2")
checkTableLock(c, tk.Se, "test", "t1", model.TableLockNone)
checkTableLock(c, tk.Se, "test", "t2", model.TableLockNone)
tk2.MustExec("lock tables t1 write, t2 read")
checkTableLock(c, tk2.Se, "test", "t1", model.TableLockWrite)
checkTableLock(c, tk2.Se, "test", "t2", model.TableLockRead)

tk.MustExec("unlock tables")
tk2.MustExec("unlock tables")
}
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ type DDL interface {
RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error
LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error
CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error

// GetLease returns current schema lease time.
GetLease() time.Duration
Expand Down
78 changes: 70 additions & 8 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -3386,16 +3387,12 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
SessionID: ctx.GetSessionVars().ConnectionID,
}
uniqueTableID := make(map[int64]struct{})
// Check whether the table was already locked by other.
// Check whether the table was already locked by another.
for _, tl := range stmt.TableLocks {
tb := tl.Table
// TODO: replace const string "performance_schema" with xxx.LowerName.
// Currently use perfschema.LowerName will have import cycle problem.
if tb.Schema.L == infoschema.LowerName || tb.Schema.L == "performance_schema" || tb.Schema.L == mysql.SystemDB {
if ctx.GetSessionVars().User != nil {
return infoschema.ErrAccessDenied.GenWithStackByArgs(ctx.GetSessionVars().User.Username, ctx.GetSessionVars().User.Hostname)
}
return infoschema.ErrAccessDenied
err := throwErrIfInMemOrSysDB(ctx, tb.Schema.L)
if err != nil {
return err
}
schema, t, err := d.getSchemaAndTableByIdent(ctx, ast.Ident{Schema: tb.Schema, Name: tb.Name})
if err != nil {
Expand Down Expand Up @@ -3467,10 +3464,75 @@ func (d *ddl) UnlockTables(ctx sessionctx.Context, unlockTables []model.TableLoc
return errors.Trace(err)
}

func throwErrIfInMemOrSysDB(ctx sessionctx.Context, dbLowerName string) error {
if util.IsMemOrSysDB(dbLowerName) {
if ctx.GetSessionVars().User != nil {
return infoschema.ErrAccessDenied.GenWithStackByArgs(ctx.GetSessionVars().User.Username, ctx.GetSessionVars().User.Hostname)
}
return infoschema.ErrAccessDenied.GenWithStackByArgs("", "")
}
return nil
}

func (d *ddl) CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error {
uniqueTableID := make(map[int64]struct{})
cleanupTables := make([]model.TableLockTpInfo, 0, len(tables))
unlockedTablesNum := 0
// Check whether the table was already locked by another.
for _, tb := range tables {
err := throwErrIfInMemOrSysDB(ctx, tb.Schema.L)
if err != nil {
return err
}
schema, t, err := d.getSchemaAndTableByIdent(ctx, ast.Ident{Schema: tb.Schema, Name: tb.Name})
if err != nil {
return errors.Trace(err)
}
if t.Meta().IsView() {
return table.ErrUnsupportedOp
}
// Maybe the table t was not locked, but still try to unlock this table.
// If we skip unlock the table here, the job maybe not consistent with the job.Query.
// eg: unlock tables t1,t2; If t2 is not locked and skip here, then the job will only unlock table t1,
// and this behaviour is not consistent with the sql query.
if !t.Meta().IsLocked() {
unlockedTablesNum++
}
if _, ok := uniqueTableID[t.Meta().ID]; ok {
return infoschema.ErrNonuniqTable.GenWithStackByArgs(t.Meta().Name)
}
uniqueTableID[t.Meta().ID] = struct{}{}
cleanupTables = append(cleanupTables, model.TableLockTpInfo{SchemaID: schema.ID, TableID: t.Meta().ID})
}
// If the num of cleanupTables is 0, or all cleanupTables is unlocked, just return here.
if len(cleanupTables) == 0 || len(cleanupTables) == unlockedTablesNum {
return nil
}

arg := &lockTablesArg{
UnlockTables: cleanupTables,
IsCleanup: true,
}
job := &model.Job{
SchemaID: cleanupTables[0].SchemaID,
TableID: cleanupTables[0].TableID,
winkyao marked this conversation as resolved.
Show resolved Hide resolved
Type: model.ActionUnlockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{arg},
}
err := d.doDDLJob(ctx, job)
if err == nil {
ctx.ReleaseTableLocks(cleanupTables)
}
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

type lockTablesArg struct {
LockTables []model.TableLockTpInfo
IndexOfLock int
UnlockTables []model.TableLockTpInfo
IndexOfUnlock int
SessionInfo model.SessionInfo
IsCleanup bool
}
5 changes: 5 additions & 0 deletions ddl/table_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ func unlockTable(tbInfo *model.TableInfo, arg *lockTablesArg) (needUpdateTableIn
if !tbInfo.IsLocked() {
return false
}
if arg.IsCleanup {
tbInfo.Lock = nil
return true
}

sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo)
if sessionIndex < 0 {
// When session clean table lock, session maybe send unlock table even the table lock maybe not hold by the session.
Expand Down
8 changes: 8 additions & 0 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = e.executeLockTables(x)
case *ast.UnlockTablesStmt:
err = e.executeUnlockTables(x)
case *ast.CleanupTableLockStmt:
err = e.executeCleanupTableLock(x)

}
if err != nil {
return e.toErr(err)
Expand Down Expand Up @@ -453,3 +456,8 @@ func (e *DDLExec) executeUnlockTables(s *ast.UnlockTablesStmt) error {
lockedTables := e.ctx.GetAllTableLocks()
return domain.GetDomain(e.ctx).DDL().UnlockTables(e.ctx, lockedTables)
}

func (e *DDLExec) executeCleanupTableLock(s *ast.CleanupTableLockStmt) error {
err := domain.GetDomain(e.ctx).DDL().CleanupTableLock(e.ctx, s.Tables)
return err
}
5 changes: 3 additions & 2 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
)

var (
Expand Down Expand Up @@ -96,8 +97,8 @@ type InfoSchema interface {

// Information Schema Name.
const (
Name = "INFORMATION_SCHEMA"
LowerName = "information_schema"
Name = util.InformationSchemaName
LowerName = util.InformationSchemaLowerName
)

type sortedTables []table.Table
Expand Down
6 changes: 4 additions & 2 deletions infoschema/perfschema/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

package perfschema

import "github.com/pingcap/tidb/util"

// Performance Schema Name.
const (
Name = "PERFORMANCE_SCHEMA"
LowerName = "performance_schema"
Name = util.PerformanceSchemaName
LowerName = util.PerformanceSchemaLowerName
)

// perfSchemaTables is a shortcut to involve all table names.
Expand Down
6 changes: 3 additions & 3 deletions lock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/infoschema/perfschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
)

// Checker uses to check tables lock.
Expand All @@ -38,8 +38,8 @@ func (c *Checker) CheckTableLock(db, table string, privilege mysql.PrivilegeType
if db == "" && table == "" {
return nil
}
// Below database are not support table lock.
if db == infoschema.LowerName || db == perfschema.LowerName || db == mysql.SystemDB {
// System DB and memory DB are not support table lock.
if util.IsMemOrSysDB(db) {
return nil
}
// check operation on database.
Expand Down
3 changes: 3 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,9 @@ func (b *PlanBuilder) buildDDL(node ast.DDLNode) (Plan, error) {
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil)
case *ast.LockTablesStmt, *ast.UnlockTablesStmt:
// TODO: add Lock Table privilege check.
case *ast.CleanupTableLockStmt:
// This command can only be executed by administrator.
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil)
}
p := &DDL{Statement: node}
return p, nil
Expand Down
21 changes: 21 additions & 0 deletions util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -130,3 +131,23 @@ func SyntaxWarn(err error) error {
}
return parser.ErrParse.GenWithStackByArgs(syntaxErrorPrefix, err.Error())
}

const (
// InformationSchemaName is the `INFORMATION_SCHEMA` database name.
InformationSchemaName = "INFORMATION_SCHEMA"
// InformationSchemaLowerName is the `INFORMATION_SCHEMA` database lower name.
InformationSchemaLowerName = "information_schema"
// PerformanceSchemaName is the `PERFORMANCE_SCHEMA` database name.
PerformanceSchemaName = "PERFORMANCE_SCHEMA"
// PerformanceSchemaLowerName is the `PERFORMANCE_SCHEMA` database lower name.
PerformanceSchemaLowerName = "performance_schema"
)

// IsMemOrSysDB uses to check whether dbLowerName is memory database or system database.
func IsMemOrSysDB(dbLowerName string) bool {
switch dbLowerName {
case InformationSchemaLowerName, PerformanceSchemaLowerName, mysql.SystemDB:
return true
}
return false
}