Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: execute scan tasks #40564

Merged
merged 3 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,21 @@ const insertIntoTTLTask = `INSERT LOW_PRIORITY INTO mysql.tidb_ttl_task SET
expire_time = %?,
created_time = %?`

// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task
func SelectFromTTLTaskWithID(jobID string) (string, []interface{}) {
// SelectFromTTLTaskWithJobID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task
func SelectFromTTLTaskWithJobID(jobID string) (string, []interface{}) {
return selectFromTTLTask + " WHERE job_id = %?", []interface{}{jobID}
}

// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job and scanID in mysql.tidb_ttl_task
func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{}) {
return selectFromTTLTask + " WHERE job_id = %? AND scan_id = %?", []interface{}{jobID, scanID}
}

// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR owner_hb_time < %? ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
func InsertIntoTTLTask(sctx sessionctx.Context, jobID string, tableID int64, scanID int, scanRangeStart []types.Datum, scanRangeEnd []types.Datum, expireTime time.Time, createdTime time.Time) (string, []interface{}, error) {
rangeStart, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeStart...)
Expand Down Expand Up @@ -115,8 +125,8 @@ func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) {
}
if !row.IsNull(3) {
scanRangeStartBuf := row.GetBytes(3)
// it's still posibble to be nil even this column is not NULL
if scanRangeStartBuf != nil {
// it's still posibble to be empty even this column is not NULL
if len(scanRangeStartBuf) > 0 {
task.ScanRangeStart, err = codec.Decode(scanRangeStartBuf, len(scanRangeStartBuf))
if err != nil {
return nil, err
Expand All @@ -125,8 +135,8 @@ func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) {
}
if !row.IsNull(4) {
scanRangeEndBuf := row.GetBytes(4)
// it's still posibble to be nil even this column is not NULL
if scanRangeEndBuf != nil {
// it's still posibble to be empty even this column is not NULL
if len(scanRangeEndBuf) > 0 {
task.ScanRangeEnd, err = codec.Decode(scanRangeEndBuf, len(scanRangeEndBuf))
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newTaskGetter(ctx context.Context, t *testing.T, tk *testkit.TestKit) *task
}

func (tg *taskGetter) mustGetTestTask() *cache.TTLTask {
sql, args := cache.SelectFromTTLTaskWithID("test-job")
sql, args := cache.SelectFromTTLTaskWithJobID("test-job")
rs, err := tg.tk.Session().ExecuteInternal(tg.ctx, sql, args...)
require.NoError(tg.t, err)
rows, err := session.GetRows4Test(context.Background(), tg.tk.Session(), rs)
Expand Down
25 changes: 22 additions & 3 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
)

// TxnMode represents using optimistic or pessimistic mode in the transaction
type TxnMode int

const (
// TxnModeOptimistic means using the optimistic transaction with "BEGIN OPTIMISTIC"
TxnModeOptimistic TxnMode = iota
// TxnModePessimistic means using the pessimistic transaction with "BEGIN PESSIMISTIC"
TxnModePessimistic
)

// Session is used to execute queries for TTL case
type Session interface {
sessionctx.Context
Expand All @@ -38,7 +48,7 @@ type Session interface {
// ExecuteSQL executes the sql
ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error)
// RunInTxn executes the specified function in a txn
RunInTxn(ctx context.Context, fn func() error) (err error)
RunInTxn(ctx context.Context, fn func() error, mode TxnMode) (err error)
// ResetWithGlobalTimeZone resets the session time zone to global time zone
ResetWithGlobalTimeZone(ctx context.Context) error
// Close closes the session
Expand Down Expand Up @@ -94,12 +104,21 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{
}

// RunInTxn executes the specified function in a txn
func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
tracer := metrics.PhaseTracerFromCtx(ctx)
defer tracer.EnterPhase(tracer.Phase())

tracer.EnterPhase(metrics.PhaseBeginTxn)
if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil {
sql := "BEGIN "
switch txnMode {
case TxnModeOptimistic:
sql += "OPTIMISTIC"
case TxnModePessimistic:
sql += "PESSIMISTIC"
default:
return errors.New("unknown transaction mode")
}
if _, err = s.ExecuteSQL(ctx, sql); err != nil {
return err
}
tracer.EnterPhase(metrics.PhaseOther)
Expand Down
6 changes: 3 additions & 3 deletions ttl/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ func TestSessionRunInTxn(t *testing.T) {
require.NoError(t, se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (1, 10)")
return nil
}))
}, session.TxnModeOptimistic))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10"))

err := se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (2, 20)")
return errors.New("mockErr")
})
}, session.TxnModeOptimistic)
require.EqualError(t, err, "mockErr")
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10"))

require.NoError(t, se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (3, 30)")
return nil
}))
}, session.TxnModeOptimistic))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10", "3 30"))
}

Expand Down
7 changes: 4 additions & 3 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"job_manager.go",
"scan.go",
"session.go",
"task_manager.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/ttl/ttlworker",
Expand All @@ -27,7 +28,6 @@ go_library(
"//types",
"//util",
"//util/chunk",
"//util/hack",
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
Expand All @@ -46,16 +46,17 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_integration_test.go",
"job_manager_integration_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
"session_test.go",
"task_manager_integration_test.go",
"task_manager_test.go",
],
embed = [":ttlworker"],
flaky = True,
deps = [
"//domain",
"//infoschema",
"//kv",
"//parser/ast",
Expand Down
10 changes: 10 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
const ttlJobTimeout = 6 * time.Hour

const taskManagerLoopTickerInterval = time.Minute
const ttlTaskHeartBeatTickerInterval = time.Minute

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
Expand All @@ -50,3 +53,10 @@ func getResizeWorkersInterval() time.Duration {
})
return resizeWorkersInterval
}

func getTaskManagerLoopTickerInterval() time.Duration {
failpoint.Inject("task-manager-loop-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return taskManagerLoopTickerInterval
}
157 changes: 5 additions & 152 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package ttlworker

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -44,7 +42,6 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
current_job_status = NULL,
current_job_status_update_time = NULL
WHERE table_id = %? AND current_job_id = %?`
const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?"
const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?"

func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) {
Expand All @@ -55,10 +52,6 @@ func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID str
return finishJobTemplate, []interface{}{finishTime.Format(timeFormat), summary, tableID, jobID}
}

func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) (string, []interface{}) {
return updateJobStateTemplate, []interface{}{currentJobState, tableID, currentJobID, currentJobOwnerID}
}

func removeTaskForJob(jobID string) (string, []interface{}) {
return removeTaskForJobTemplate, []interface{}{jobID}
}
Expand All @@ -67,78 +60,23 @@ type ttlJob struct {
id string
ownerID string

ctx context.Context
cancel func()

createTime time.Time

tbl *cache.PhysicalTable

tasks []*ttlScanTask
taskIter int
finishedScanTaskCounter int
scanTaskErr error

// status is the only field which should be protected by a mutex, as `Cancel` may be called at any time, and will
// change the status
statusMutex sync.Mutex
status cache.JobStatus

statistics *ttlStatistics
}

// changeStatus updates the state of this job
func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status cache.JobStatus) error {
job.statusMutex.Lock()
oldStatus := job.status
job.status = status
job.statusMutex.Unlock()

sql, args := updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}

func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
summary, err := job.summary()
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
sql, args := updateJobState(job.tbl.ID, job.id, summary, job.ownerID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}

// peekScanTask returns the next scan task, but doesn't promote the iterator
func (job *ttlJob) peekScanTask() *ttlScanTask {
return job.tasks[job.taskIter]
}

// nextScanTask promotes the iterator
func (job *ttlJob) nextScanTask() {
job.taskIter += 1
}

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time) {
summary, err := job.summary()
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}

func (job *ttlJob) finish(se session.Session, now time.Time, summary string) {
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
err = se.RunInTxn(context.TODO(), func() error {
err := se.RunInTxn(context.TODO(), func() error {
sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id)
_, err = se.ExecuteSQL(context.TODO(), sql, args...)
_, err := se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
Expand All @@ -150,94 +88,9 @@ func (job *ttlJob) finish(se session.Session, now time.Time) {
}

return nil
})
}, session.TxnModeOptimistic)

if err != nil {
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
}
}

// AllSpawned returns whether all scan tasks have been dumped out
// **This function will be called concurrently, in many workers' goroutine**
func (job *ttlJob) AllSpawned() bool {
return job.taskIter == len(job.tasks) && len(job.tasks) != 0
}

// Timeout will return whether the job has timeout, if it is, it will be killed
func (job *ttlJob) Timeout(ctx context.Context, se session.Session, now time.Time) bool {
if !job.createTime.Add(ttlJobTimeout).Before(now) {
return false
}

err := job.changeStatus(ctx, se, cache.JobStatusTimeout)
if err != nil {
logutil.BgLogger().Info("fail to update status of ttl job", zap.String("jobID", job.id), zap.Error(err))
}

return true
}

// Finished returns whether the job is finished
func (job *ttlJob) Finished() bool {
job.statusMutex.Lock()
defer job.statusMutex.Unlock()
// in three condition, a job is considered finished:
// 1. It's cancelled manually
// 2. All scan tasks have been finished, and all selected rows succeed or in error state
// 3. The job is created one hour ago. It's a timeout.
return job.status == cache.JobStatusCancelled || (job.AllSpawned() && job.finishedScanTaskCounter == len(job.tasks) && job.statistics.TotalRows.Load() == job.statistics.ErrorRows.Load()+job.statistics.SuccessRows.Load())
}

// Cancel cancels the job context
func (job *ttlJob) Cancel(ctx context.Context, se session.Session) error {
if job.cancel != nil {
job.cancel()
}
// TODO: wait until all tasks have been finished
return job.changeStatus(ctx, se, cache.JobStatusCancelled)
}

func findJobWithTableID(jobs []*ttlJob, id int64) *ttlJob {
for _, j := range jobs {
if j.tbl.ID == id {
return j
}
}

return nil
}

type ttlSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`

TotalScanTask int `json:"total_scan_task"`
ScheduledScanTask int `json:"scheduled_scan_task"`
FinishedScanTask int `json:"finished_scan_task"`

ScanTaskErr string `json:"scan_task_err,omitempty"`
}

func (job *ttlJob) summary() (string, error) {
summary := &ttlSummary{
TotalRows: job.statistics.TotalRows.Load(),
SuccessRows: job.statistics.SuccessRows.Load(),
ErrorRows: job.statistics.ErrorRows.Load(),

TotalScanTask: len(job.tasks),
ScheduledScanTask: job.taskIter,
FinishedScanTask: job.finishedScanTaskCounter,
}

if job.scanTaskErr != nil {
summary.ScanTaskErr = job.scanTaskErr.Error()
}

summaryJSON, err := json.Marshal(summary)
if err != nil {
return "", err
}

return string(hack.String(summaryJSON)), nil
}
Loading