From f0cbe82aac01e9c85c3d515c3cac69a51b6aa99e Mon Sep 17 00:00:00 2001 From: YangKeao Date: Wed, 18 Jan 2023 17:16:32 +0800 Subject: [PATCH] send ttl task notification through etcd Signed-off-by: YangKeao --- ttl/client/BUILD.bazel | 6 +- ttl/client/command.go | 34 +++++--- ttl/client/command_test.go | 2 +- ttl/client/notification.go | 79 +++++++++++++++++++ ttl/ttlworker/config.go | 1 + ttl/ttlworker/job_manager.go | 42 ++++++++-- ttl/ttlworker/job_manager_integration_test.go | 38 +++++++++ 7 files changed, 183 insertions(+), 19 deletions(-) create mode 100644 ttl/client/notification.go diff --git a/ttl/client/BUILD.bazel b/ttl/client/BUILD.bazel index 6f2c7acaae481..e842ad03a887b 100644 --- a/ttl/client/BUILD.bazel +++ b/ttl/client/BUILD.bazel @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "client", - srcs = ["command.go"], + srcs = [ + "command.go", + "notification.go", + ], importpath = "github.com/pingcap/tidb/ttl/client", visibility = ["//visibility:public"], deps = [ + "//ddl/util", "//util/logutil", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", diff --git a/ttl/client/command.go b/ttl/client/command.go index bad2d756353cd..a285d9b186e3c 100644 --- a/ttl/client/command.go +++ b/ttl/client/command.go @@ -112,12 +112,13 @@ func TriggerNewTTLJob(ctx context.Context, cli CommandClient, dbName, tableName return &resp, nil } +// etcdClient is the client of etcd which implements the commandCli and notificationCli interface type etcdClient struct { etcdCli *clientv3.Client } -// NewEtcdCommandClient creates a client with etcd -func NewEtcdCommandClient(etcdCli *clientv3.Client) CommandClient { +// NewCommandClient creates a command client with etcd +func NewCommandClient(etcdCli *clientv3.Client) CommandClient { return &etcdClient{ etcdCli: etcdCli, } @@ -196,6 +197,7 @@ loop: return json.Unmarshal(cmdResp.Data, obj) } +// Command implements the CommandClient func (c *etcdClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) { requestID, err := c.sendCmd(ctx, cmdType, request) if err != nil { @@ -204,6 +206,7 @@ func (c *etcdClient) Command(ctx context.Context, cmdType string, request interf return requestID, c.waitCmdResponse(ctx, requestID, &response) } +// TakeCommand implements the CommandClient func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error) { resp, err := c.etcdCli.Delete(ctx, ttlCmdKeyRequestPrefix+reqID) if err != nil { @@ -212,6 +215,7 @@ func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error return resp.Deleted > 0, nil } +// ResponseCommand implements the CommandClient func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj interface{}) error { resp := &cmdResponse{ RequestID: reqID, @@ -241,6 +245,7 @@ func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj inte return err } +// WatchCommand implements the CommandClient func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest { ch := make(chan *CmdRequest) go func() { @@ -279,20 +284,24 @@ func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest { return ch } +// mockClient is a mock implementation for CommandCli and NotificationCli type mockClient struct { sync.Mutex - store map[string]interface{} - watchers []chan *CmdRequest + store map[string]interface{} + commandWatchers []chan *CmdRequest + notificationWatchers map[string][]chan clientv3.WatchResponse } -// NewMockCommandClient creates a mock client +// NewMockCommandClient creates a mock command client func NewMockCommandClient() CommandClient { return &mockClient{ - store: make(map[string]interface{}), - watchers: make([]chan *CmdRequest, 0, 1), + store: make(map[string]interface{}), + commandWatchers: make([]chan *CmdRequest, 0, 1), + notificationWatchers: make(map[string][]chan clientv3.WatchResponse), } } +// Command implements the CommandClient func (c *mockClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds)) defer cancel() @@ -346,7 +355,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf defer c.Unlock() key := ttlCmdKeyRequestPrefix + reqID c.store[key] = req - for _, ch := range c.watchers { + for _, ch := range c.commandWatchers { select { case <-ctx.Done(): return reqID, ctx.Err() @@ -358,6 +367,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf return reqID, nil } +// TakeCommand implements the CommandClient func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error) { c.Lock() defer c.Unlock() @@ -369,6 +379,7 @@ func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error) return false, nil } +// ResponseCommand implements the CommandClient func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interface{}) error { c.Lock() defer c.Unlock() @@ -391,11 +402,12 @@ func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interf return nil } +// WatchCommand implements the CommandClient func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest { c.Lock() defer c.Unlock() ch := make(chan *CmdRequest, 16+len(c.store)) - c.watchers = append(c.watchers, ch) + c.commandWatchers = append(c.commandWatchers, ch) for key, val := range c.store { if strings.HasPrefix(key, ttlCmdKeyRequestPrefix) { if req, ok := val.(*CmdRequest); ok { @@ -407,9 +419,9 @@ func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest { <-ctx.Done() c.Lock() defer c.Unlock() - for i, chItem := range c.watchers { + for i, chItem := range c.commandWatchers { if chItem == ch { - c.watchers = append(c.watchers[:i], c.watchers[i+1:]...) + c.commandWatchers = append(c.commandWatchers[:i], c.commandWatchers[i+1:]...) break } } diff --git a/ttl/client/command_test.go b/ttl/client/command_test.go index 830137f32904e..69cde75309ad6 100644 --- a/ttl/client/command_test.go +++ b/ttl/client/command_test.go @@ -42,7 +42,7 @@ func TestCommandClient(t *testing.T) { defer cluster.Terminate(t) etcd := cluster.RandClient() - etcdCli := NewEtcdCommandClient(etcd) + etcdCli := NewCommandClient(etcd) mockCli := NewMockCommandClient() ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) diff --git a/ttl/client/notification.go b/ttl/client/notification.go new file mode 100644 index 0000000000000..6c44cd0dd7aa9 --- /dev/null +++ b/ttl/client/notification.go @@ -0,0 +1,79 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + + "github.com/pingcap/tidb/ddl/util" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ttlNotificationPrefix string = "/tidb/ttl/notification/" + +// NotificationClient is a client to notify other TTL workers +type NotificationClient interface { + // Notify sends a notification + Notify(ctx context.Context, typ string, data string) error + // WatchNotification opens a channel, in which we could receive all notifications + WatchNotification(ctx context.Context, typ string) clientv3.WatchChan +} + +// NewNotificationClient creates a notification client with etcd +func NewNotificationClient(etcdCli *clientv3.Client) NotificationClient { + return &etcdClient{ + etcdCli: etcdCli, + } +} + +// Notify stores the corresponding K-V in the etcd +func (c *etcdClient) Notify(ctx context.Context, typ string, data string) error { + return util.PutKVToEtcd(ctx, c.etcdCli, 1, ttlNotificationPrefix+typ, data) +} + +// WatchNotification returns a go channel to get notification +func (c *etcdClient) WatchNotification(ctx context.Context, typ string) clientv3.WatchChan { + return c.etcdCli.Watch(ctx, ttlNotificationPrefix+typ) +} + +// NewMockNotificationClient creates a mock notification client +func NewMockNotificationClient() NotificationClient { + return &mockClient{ + store: make(map[string]interface{}), + commandWatchers: make([]chan *CmdRequest, 0, 1), + notificationWatchers: make(map[string][]chan clientv3.WatchResponse), + } +} + +// Notify implements the NotificationClient +func (c *mockClient) Notify(_ context.Context, typ string, data string) error { + c.Lock() + defer c.Unlock() + + for _, ch := range c.notificationWatchers[typ] { + ch <- clientv3.WatchResponse{} + } + return nil +} + +// WatchNotification implements the NotificationClient +func (c *mockClient) WatchNotification(_ context.Context, typ string) clientv3.WatchChan { + c.Lock() + defer c.Unlock() + + ch := make(chan clientv3.WatchResponse, 1) + c.notificationWatchers[typ] = append(c.notificationWatchers[typ], ch) + return ch +} diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go index c1774bc667348..468150c3949a7 100644 --- a/ttl/ttlworker/config.go +++ b/ttl/ttlworker/config.go @@ -32,6 +32,7 @@ const ttlJobTimeout = 6 * time.Hour const taskManagerLoopTickerInterval = time.Minute const ttlTaskHeartBeatTickerInterval = time.Minute +const ttlTaskGCInterval = time.Hour func getUpdateInfoSchemaCacheInterval() time.Duration { failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration { diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 223128b52f26c..77a66d7e3f761 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -38,6 +38,8 @@ import ( "go.uber.org/zap" ) +const scanTaskNotificationType string = "scan" + const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)" const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status SET current_job_id = %?, @@ -82,8 +84,9 @@ type JobManager struct { // id is the ddl id of this instance id string - store kv.Storage - cmdCli client.CommandClient + store kv.Storage + cmdCli client.CommandClient + notificationCli client.NotificationClient // infoSchemaCache and tableStatusCache are a cache stores the information from info schema and the tidb_ttl_table_status // table. They don't need to be protected by mutex, because they are only used in job loop goroutine. @@ -113,9 +116,11 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval()) if etcdCli != nil { - manager.cmdCli = client.NewEtcdCommandClient(etcdCli) + manager.cmdCli = client.NewCommandClient(etcdCli) + manager.notificationCli = client.NewNotificationClient(etcdCli) } else { manager.cmdCli = client.NewMockCommandClient() + manager.notificationCli = client.NewMockNotificationClient() } manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id) @@ -150,6 +155,7 @@ func (m *JobManager) jobLoop() error { checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval()) cmdWatcher := m.cmdCli.WatchCommand(m.ctx) + scanTaskNotificationWatcher := m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType) m.taskManager.resizeWorkersWithSysVar() for { m.reportMetrics() @@ -208,6 +214,17 @@ func (m *JobManager) jobLoop() error { // Task Manager Loop case <-scheduleTaskTicker: m.taskManager.rescheduleTasks(se, now) + case _, ok := <-scanTaskNotificationWatcher: + if !ok { + if m.ctx.Err() != nil { + return nil + } + + logutil.BgLogger().Warn("The TTL scan task notification watcher is closed unexpectedly, re-watch it again") + scanTaskNotificationWatcher = m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType) + continue + } + m.taskManager.rescheduleTasks(se, now) case <-taskCheckTicker: m.taskManager.checkInvalidTask(se) m.taskManager.checkFinishedTask(se, now) @@ -611,10 +628,18 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * if err != nil { return nil, err } - return m.createNewJob(now, table) + + job := m.createNewJob(now, table) + + // job is created, notify every scan managers to fetch new tasks + err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to trigger scan tasks", zap.Error(err)) + } + return job, nil } -func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*ttlJob, error) { +func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) *ttlJob { id := m.tableStatusCache.Tables[table.ID].CurrentJobID return &ttlJob{ @@ -627,7 +652,7 @@ func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*t tbl: table, status: cache.JobStatusWaiting, - }, nil + } } // updateHeartBeat updates the heartbeat for all task with current instance as owner @@ -687,6 +712,11 @@ func (m *JobManager) GetCommandCli() client.CommandClient { return m.cmdCli } +// GetNotificationCli returns the notification client +func (m *JobManager) GetNotificationCli() client.NotificationClient { + return m.notificationCli +} + type ttlSummary struct { TotalRows uint64 `json:"total_rows"` SuccessRows uint64 `json:"success_rows"` diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index c763e1363aecd..313b9257249a6 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -407,6 +407,44 @@ func TestJobTimeout(t *testing.T) { tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) } +func TestTriggerScanTask(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + now := time.Now() + + se := sessionFactory() + m := dom.TTLJobManager() + m.TaskManager().ResizeWorkersWithSysVar() + nCli := m.GetNotificationCli() + + tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") + require.NoError(t, m.InfoSchemaCache().Update(se)) + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + <-nCli.WatchNotification(context.Background(), "scan") + wg.Done() + }() + m.RescheduleJobs(se, now) + + // notification is sent + wg.Wait() + + for time.Now().Before(now.Add(time.Second * 5)) { + time.Sleep(time.Second) + rows := tk.MustQuery("SELECT status FROM mysql.tidb_ttl_task").Rows() + if len(rows) == 0 { + break + } + if rows[0][0] == cache.TaskStatusFinished { + break + } + } +} + func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) { maxWaitTime := 30 for {