Skip to content

Commit

Permalink
send ttl task notification through etcd
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Jan 28, 2023
1 parent 465ab74 commit f0cbe82
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 19 deletions.
6 changes: 5 additions & 1 deletion ttl/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "client",
srcs = ["command.go"],
srcs = [
"command.go",
"notification.go",
],
importpath = "github.com/pingcap/tidb/ttl/client",
visibility = ["//visibility:public"],
deps = [
"//ddl/util",
"//util/logutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
Expand Down
34 changes: 23 additions & 11 deletions ttl/client/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ func TriggerNewTTLJob(ctx context.Context, cli CommandClient, dbName, tableName
return &resp, nil
}

// etcdClient is the client of etcd which implements the commandCli and notificationCli interface
type etcdClient struct {
etcdCli *clientv3.Client
}

// NewEtcdCommandClient creates a client with etcd
func NewEtcdCommandClient(etcdCli *clientv3.Client) CommandClient {
// NewCommandClient creates a command client with etcd
func NewCommandClient(etcdCli *clientv3.Client) CommandClient {
return &etcdClient{
etcdCli: etcdCli,
}
Expand Down Expand Up @@ -196,6 +197,7 @@ loop:
return json.Unmarshal(cmdResp.Data, obj)
}

// Command implements the CommandClient
func (c *etcdClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
requestID, err := c.sendCmd(ctx, cmdType, request)
if err != nil {
Expand All @@ -204,6 +206,7 @@ func (c *etcdClient) Command(ctx context.Context, cmdType string, request interf
return requestID, c.waitCmdResponse(ctx, requestID, &response)
}

// TakeCommand implements the CommandClient
func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error) {
resp, err := c.etcdCli.Delete(ctx, ttlCmdKeyRequestPrefix+reqID)
if err != nil {
Expand All @@ -212,6 +215,7 @@ func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error
return resp.Deleted > 0, nil
}

// ResponseCommand implements the CommandClient
func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj interface{}) error {
resp := &cmdResponse{
RequestID: reqID,
Expand Down Expand Up @@ -241,6 +245,7 @@ func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj inte
return err
}

// WatchCommand implements the CommandClient
func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
ch := make(chan *CmdRequest)
go func() {
Expand Down Expand Up @@ -279,20 +284,24 @@ func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
return ch
}

// mockClient is a mock implementation for CommandCli and NotificationCli
type mockClient struct {
sync.Mutex
store map[string]interface{}
watchers []chan *CmdRequest
store map[string]interface{}
commandWatchers []chan *CmdRequest
notificationWatchers map[string][]chan clientv3.WatchResponse
}

// NewMockCommandClient creates a mock client
// NewMockCommandClient creates a mock command client
func NewMockCommandClient() CommandClient {
return &mockClient{
store: make(map[string]interface{}),
watchers: make([]chan *CmdRequest, 0, 1),
store: make(map[string]interface{}),
commandWatchers: make([]chan *CmdRequest, 0, 1),
notificationWatchers: make(map[string][]chan clientv3.WatchResponse),
}
}

// Command implements the CommandClient
func (c *mockClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds))
defer cancel()
Expand Down Expand Up @@ -346,7 +355,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf
defer c.Unlock()
key := ttlCmdKeyRequestPrefix + reqID
c.store[key] = req
for _, ch := range c.watchers {
for _, ch := range c.commandWatchers {
select {
case <-ctx.Done():
return reqID, ctx.Err()
Expand All @@ -358,6 +367,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf
return reqID, nil
}

// TakeCommand implements the CommandClient
func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error) {
c.Lock()
defer c.Unlock()
Expand All @@ -369,6 +379,7 @@ func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error)
return false, nil
}

// ResponseCommand implements the CommandClient
func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interface{}) error {
c.Lock()
defer c.Unlock()
Expand All @@ -391,11 +402,12 @@ func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interf
return nil
}

// WatchCommand implements the CommandClient
func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
c.Lock()
defer c.Unlock()
ch := make(chan *CmdRequest, 16+len(c.store))
c.watchers = append(c.watchers, ch)
c.commandWatchers = append(c.commandWatchers, ch)
for key, val := range c.store {
if strings.HasPrefix(key, ttlCmdKeyRequestPrefix) {
if req, ok := val.(*CmdRequest); ok {
Expand All @@ -407,9 +419,9 @@ func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
<-ctx.Done()
c.Lock()
defer c.Unlock()
for i, chItem := range c.watchers {
for i, chItem := range c.commandWatchers {
if chItem == ch {
c.watchers = append(c.watchers[:i], c.watchers[i+1:]...)
c.commandWatchers = append(c.commandWatchers[:i], c.commandWatchers[i+1:]...)
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion ttl/client/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestCommandClient(t *testing.T) {
defer cluster.Terminate(t)
etcd := cluster.RandClient()

etcdCli := NewEtcdCommandClient(etcd)
etcdCli := NewCommandClient(etcd)
mockCli := NewMockCommandClient()

ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
Expand Down
79 changes: 79 additions & 0 deletions ttl/client/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"

"github.com/pingcap/tidb/ddl/util"
clientv3 "go.etcd.io/etcd/client/v3"
)

const ttlNotificationPrefix string = "/tidb/ttl/notification/"

// NotificationClient is a client to notify other TTL workers
type NotificationClient interface {
// Notify sends a notification
Notify(ctx context.Context, typ string, data string) error
// WatchNotification opens a channel, in which we could receive all notifications
WatchNotification(ctx context.Context, typ string) clientv3.WatchChan
}

// NewNotificationClient creates a notification client with etcd
func NewNotificationClient(etcdCli *clientv3.Client) NotificationClient {
return &etcdClient{
etcdCli: etcdCli,
}
}

// Notify stores the corresponding K-V in the etcd
func (c *etcdClient) Notify(ctx context.Context, typ string, data string) error {
return util.PutKVToEtcd(ctx, c.etcdCli, 1, ttlNotificationPrefix+typ, data)
}

// WatchNotification returns a go channel to get notification
func (c *etcdClient) WatchNotification(ctx context.Context, typ string) clientv3.WatchChan {
return c.etcdCli.Watch(ctx, ttlNotificationPrefix+typ)
}

// NewMockNotificationClient creates a mock notification client
func NewMockNotificationClient() NotificationClient {
return &mockClient{
store: make(map[string]interface{}),
commandWatchers: make([]chan *CmdRequest, 0, 1),
notificationWatchers: make(map[string][]chan clientv3.WatchResponse),
}
}

// Notify implements the NotificationClient
func (c *mockClient) Notify(_ context.Context, typ string, data string) error {
c.Lock()
defer c.Unlock()

for _, ch := range c.notificationWatchers[typ] {
ch <- clientv3.WatchResponse{}
}
return nil
}

// WatchNotification implements the NotificationClient
func (c *mockClient) WatchNotification(_ context.Context, typ string) clientv3.WatchChan {
c.Lock()
defer c.Unlock()

ch := make(chan clientv3.WatchResponse, 1)
c.notificationWatchers[typ] = append(c.notificationWatchers[typ], ch)
return ch
}
1 change: 1 addition & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const ttlJobTimeout = 6 * time.Hour

const taskManagerLoopTickerInterval = time.Minute
const ttlTaskHeartBeatTickerInterval = time.Minute
const ttlTaskGCInterval = time.Hour

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
Expand Down
42 changes: 36 additions & 6 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/zap"
)

const scanTaskNotificationType string = "scan"

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)"
const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = %?,
Expand Down Expand Up @@ -82,8 +84,9 @@ type JobManager struct {
// id is the ddl id of this instance
id string

store kv.Storage
cmdCli client.CommandClient
store kv.Storage
cmdCli client.CommandClient
notificationCli client.NotificationClient

// infoSchemaCache and tableStatusCache are a cache stores the information from info schema and the tidb_ttl_table_status
// table. They don't need to be protected by mutex, because they are only used in job loop goroutine.
Expand Down Expand Up @@ -113,9 +116,11 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())

if etcdCli != nil {
manager.cmdCli = client.NewEtcdCommandClient(etcdCli)
manager.cmdCli = client.NewCommandClient(etcdCli)
manager.notificationCli = client.NewNotificationClient(etcdCli)
} else {
manager.cmdCli = client.NewMockCommandClient()
manager.notificationCli = client.NewMockNotificationClient()
}

manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id)
Expand Down Expand Up @@ -150,6 +155,7 @@ func (m *JobManager) jobLoop() error {
checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())

cmdWatcher := m.cmdCli.WatchCommand(m.ctx)
scanTaskNotificationWatcher := m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType)
m.taskManager.resizeWorkersWithSysVar()
for {
m.reportMetrics()
Expand Down Expand Up @@ -208,6 +214,17 @@ func (m *JobManager) jobLoop() error {
// Task Manager Loop
case <-scheduleTaskTicker:
m.taskManager.rescheduleTasks(se, now)
case _, ok := <-scanTaskNotificationWatcher:
if !ok {
if m.ctx.Err() != nil {
return nil
}

logutil.BgLogger().Warn("The TTL scan task notification watcher is closed unexpectedly, re-watch it again")
scanTaskNotificationWatcher = m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType)
continue
}
m.taskManager.rescheduleTasks(se, now)
case <-taskCheckTicker:
m.taskManager.checkInvalidTask(se)
m.taskManager.checkFinishedTask(se, now)
Expand Down Expand Up @@ -611,10 +628,18 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return nil, err
}
return m.createNewJob(now, table)

job := m.createNewJob(now, table)

// job is created, notify every scan managers to fetch new tasks
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to trigger scan tasks", zap.Error(err))
}
return job, nil
}

func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) *ttlJob {
id := m.tableStatusCache.Tables[table.ID].CurrentJobID

return &ttlJob{
Expand All @@ -627,7 +652,7 @@ func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*t
tbl: table,

status: cache.JobStatusWaiting,
}, nil
}
}

// updateHeartBeat updates the heartbeat for all task with current instance as owner
Expand Down Expand Up @@ -687,6 +712,11 @@ func (m *JobManager) GetCommandCli() client.CommandClient {
return m.cmdCli
}

// GetNotificationCli returns the notification client
func (m *JobManager) GetNotificationCli() client.NotificationClient {
return m.notificationCli
}

type ttlSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
Expand Down
38 changes: 38 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,44 @@ func TestJobTimeout(t *testing.T) {
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func TestTriggerScanTask(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)

now := time.Now()

se := sessionFactory()
m := dom.TTLJobManager()
m.TaskManager().ResizeWorkersWithSysVar()
nCli := m.GetNotificationCli()

tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
require.NoError(t, m.InfoSchemaCache().Update(se))

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
<-nCli.WatchNotification(context.Background(), "scan")
wg.Done()
}()
m.RescheduleJobs(se, now)

// notification is sent
wg.Wait()

for time.Now().Before(now.Add(time.Second * 5)) {
time.Sleep(time.Second)
rows := tk.MustQuery("SELECT status FROM mysql.tidb_ttl_task").Rows()
if len(rows) == 0 {
break
}
if rows[0][0] == cache.TaskStatusFinished {
break
}
}
}

func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) {
maxWaitTime := 30
for {
Expand Down

0 comments on commit f0cbe82

Please sign in to comment.