From 601b3b08b1e30466a30a5d5f90135183e7df23da Mon Sep 17 00:00:00 2001 From: YangKeao Date: Sun, 5 Feb 2023 19:04:23 +0800 Subject: [PATCH 1/2] fix ttl error log and pass jobID directly Signed-off-by: YangKeao --- ttl/ttlworker/job_manager.go | 9 ++++----- ttl/ttlworker/scan.go | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 5f8b7bd038fc4..82ff83e5528b7 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -535,6 +535,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac // It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) { var expireTime time.Time + var jobID string err := se.RunInTxn(ctx, func() error { sql, args := cache.SelectFromTTLTableStatusWithID(table.ID) @@ -574,7 +575,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - jobID := uuid.New().String() + jobID = uuid.New().String() jobExist := false if len(tableStatus.CurrentJobID) > 0 { // don't create new job if there is already one running @@ -629,7 +630,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return nil, err } - job := m.createNewJob(expireTime, now, table) + job := m.createNewJob(jobID, expireTime, now, table) // job is created, notify every scan managers to fetch new tasks err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id) @@ -639,9 +640,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return job, nil } -func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob { - id := m.tableStatusCache.Tables[table.ID].CurrentJobID - +func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob { return &ttlJob{ id: id, ownerID: m.id, diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index ac1c1cd85ab50..4cf3d919d9545 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -166,7 +166,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s zap.String("SQL", sql), zap.Int("retryTimes", retryTimes), zap.Bool("needRetry", needRetry), - zap.Error(err), + zap.Error(sqlErr), ) if !needRetry { From 85518756b6cc95d9ddd1278a91e6cc959ff5e087 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Mon, 6 Feb 2023 14:30:52 +0800 Subject: [PATCH 2/2] decrease the ticker interval of task check Signed-off-by: YangKeao --- ttl/ttlworker/job_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 82ff83e5528b7..919cc56e7c8da 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -153,7 +153,7 @@ func (m *JobManager) jobLoop() error { scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval()) updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval) - taskCheckTicker := time.Tick(getTaskManagerLoopTickerInterval()) + taskCheckTicker := time.Tick(time.Second * 5) checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval()) cmdWatcher := m.cmdCli.WatchCommand(m.ctx)