Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: notify tidb nodes through etcd notification #40705

Merged
merged 3 commits into from
Jan 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ttl/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "client",
srcs = ["command.go"],
srcs = [
"command.go",
"notification.go",
],
importpath = "github.com/pingcap/tidb/ttl/client",
visibility = ["//visibility:public"],
deps = [
"//ddl/util",
"//util/logutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
Expand Down
34 changes: 23 additions & 11 deletions ttl/client/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ func TriggerNewTTLJob(ctx context.Context, cli CommandClient, dbName, tableName
return &resp, nil
}

// etcdClient is the client of etcd which implements the commandCli and notificationCli interface
type etcdClient struct {
etcdCli *clientv3.Client
}

// NewEtcdCommandClient creates a client with etcd
func NewEtcdCommandClient(etcdCli *clientv3.Client) CommandClient {
// NewCommandClient creates a command client with etcd
func NewCommandClient(etcdCli *clientv3.Client) CommandClient {
return &etcdClient{
etcdCli: etcdCli,
}
Expand Down Expand Up @@ -196,6 +197,7 @@ loop:
return json.Unmarshal(cmdResp.Data, obj)
}

// Command implements the CommandClient
func (c *etcdClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
requestID, err := c.sendCmd(ctx, cmdType, request)
if err != nil {
Expand All @@ -204,6 +206,7 @@ func (c *etcdClient) Command(ctx context.Context, cmdType string, request interf
return requestID, c.waitCmdResponse(ctx, requestID, &response)
}

// TakeCommand implements the CommandClient
func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error) {
resp, err := c.etcdCli.Delete(ctx, ttlCmdKeyRequestPrefix+reqID)
if err != nil {
Expand All @@ -212,6 +215,7 @@ func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error
return resp.Deleted > 0, nil
}

// ResponseCommand implements the CommandClient
func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj interface{}) error {
resp := &cmdResponse{
RequestID: reqID,
Expand Down Expand Up @@ -241,6 +245,7 @@ func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj inte
return err
}

// WatchCommand implements the CommandClient
func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
ch := make(chan *CmdRequest)
go func() {
Expand Down Expand Up @@ -279,20 +284,24 @@ func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
return ch
}

// mockClient is a mock implementation for CommandCli and NotificationCli
type mockClient struct {
sync.Mutex
store map[string]interface{}
watchers []chan *CmdRequest
store map[string]interface{}
commandWatchers []chan *CmdRequest
notificationWatchers map[string][]chan clientv3.WatchResponse
}

// NewMockCommandClient creates a mock client
// NewMockCommandClient creates a mock command client
func NewMockCommandClient() CommandClient {
return &mockClient{
store: make(map[string]interface{}),
watchers: make([]chan *CmdRequest, 0, 1),
store: make(map[string]interface{}),
commandWatchers: make([]chan *CmdRequest, 0, 1),
notificationWatchers: make(map[string][]chan clientv3.WatchResponse),
}
}

// Command implements the CommandClient
func (c *mockClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds))
defer cancel()
Expand Down Expand Up @@ -346,7 +355,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf
defer c.Unlock()
key := ttlCmdKeyRequestPrefix + reqID
c.store[key] = req
for _, ch := range c.watchers {
for _, ch := range c.commandWatchers {
select {
case <-ctx.Done():
return reqID, ctx.Err()
Expand All @@ -358,6 +367,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf
return reqID, nil
}

// TakeCommand implements the CommandClient
func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error) {
c.Lock()
defer c.Unlock()
Expand All @@ -369,6 +379,7 @@ func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error)
return false, nil
}

// ResponseCommand implements the CommandClient
func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interface{}) error {
c.Lock()
defer c.Unlock()
Expand All @@ -391,11 +402,12 @@ func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interf
return nil
}

// WatchCommand implements the CommandClient
func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
c.Lock()
defer c.Unlock()
ch := make(chan *CmdRequest, 16+len(c.store))
c.watchers = append(c.watchers, ch)
c.commandWatchers = append(c.commandWatchers, ch)
for key, val := range c.store {
if strings.HasPrefix(key, ttlCmdKeyRequestPrefix) {
if req, ok := val.(*CmdRequest); ok {
Expand All @@ -407,9 +419,9 @@ func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
<-ctx.Done()
c.Lock()
defer c.Unlock()
for i, chItem := range c.watchers {
for i, chItem := range c.commandWatchers {
if chItem == ch {
c.watchers = append(c.watchers[:i], c.watchers[i+1:]...)
c.commandWatchers = append(c.commandWatchers[:i], c.commandWatchers[i+1:]...)
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion ttl/client/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestCommandClient(t *testing.T) {
defer cluster.Terminate(t)
etcd := cluster.RandClient()

etcdCli := NewEtcdCommandClient(etcd)
etcdCli := NewCommandClient(etcd)
mockCli := NewMockCommandClient()

ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
Expand Down
79 changes: 79 additions & 0 deletions ttl/client/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"

"github.com/pingcap/tidb/ddl/util"
clientv3 "go.etcd.io/etcd/client/v3"
)

const ttlNotificationPrefix string = "/tidb/ttl/notification/"

// NotificationClient is a client to notify other TTL workers
type NotificationClient interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to merge interface NotificationClient and CommandClient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's good to merge these two interfaces. They are actually doing different things, and the requirement of CommandClient is much higher than the NotificationClient (if the command somehow disappeared, the sending one will be blocked).

Also we just want to use the CommandClient in the test, but the NotificationClient is used in production codes (as we don't expect it to be 100% transfered, and the message is not important).

// Notify sends a notification
Notify(ctx context.Context, typ string, data string) error
// WatchNotification opens a channel, in which we could receive all notifications
WatchNotification(ctx context.Context, typ string) clientv3.WatchChan
}

// NewNotificationClient creates a notification client with etcd
func NewNotificationClient(etcdCli *clientv3.Client) NotificationClient {
return &etcdClient{
etcdCli: etcdCli,
}
}

// Notify stores the corresponding K-V in the etcd
func (c *etcdClient) Notify(ctx context.Context, typ string, data string) error {
return util.PutKVToEtcd(ctx, c.etcdCli, 1, ttlNotificationPrefix+typ, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is strange that a common function is placed in the ddl/util. I think it should be in the util.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because this function is only used by ddl (to notify after inserting ddl jobs). We could consider to extract this function to a more proper place (especially after implementing notification for the unified job/worker framework 🤔 ).

#37119 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn’t agree more.

}

// WatchNotification returns a go channel to get notification
func (c *etcdClient) WatchNotification(ctx context.Context, typ string) clientv3.WatchChan {
return c.etcdCli.Watch(ctx, ttlNotificationPrefix+typ)
}

// NewMockNotificationClient creates a mock notification client
func NewMockNotificationClient() NotificationClient {
return &mockClient{
store: make(map[string]interface{}),
commandWatchers: make([]chan *CmdRequest, 0, 1),
notificationWatchers: make(map[string][]chan clientv3.WatchResponse),
}
}

// Notify implements the NotificationClient
func (c *mockClient) Notify(_ context.Context, typ string, data string) error {
c.Lock()
defer c.Unlock()

for _, ch := range c.notificationWatchers[typ] {
ch <- clientv3.WatchResponse{}
}
return nil
}

// WatchNotification implements the NotificationClient
func (c *mockClient) WatchNotification(_ context.Context, typ string) clientv3.WatchChan {
c.Lock()
defer c.Unlock()

ch := make(chan clientv3.WatchResponse, 1)
c.notificationWatchers[typ] = append(c.notificationWatchers[typ], ch)
return ch
}
1 change: 1 addition & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const ttlJobTimeout = 6 * time.Hour

const taskManagerLoopTickerInterval = time.Minute
const ttlTaskHeartBeatTickerInterval = time.Minute
const ttlTaskGCInterval = time.Hour

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
Expand Down
42 changes: 36 additions & 6 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/zap"
)

const scanTaskNotificationType string = "scan"

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)"
const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = %?,
Expand Down Expand Up @@ -82,8 +84,9 @@ type JobManager struct {
// id is the ddl id of this instance
id string

store kv.Storage
cmdCli client.CommandClient
store kv.Storage
cmdCli client.CommandClient
notificationCli client.NotificationClient

// infoSchemaCache and tableStatusCache are a cache stores the information from info schema and the tidb_ttl_table_status
// table. They don't need to be protected by mutex, because they are only used in job loop goroutine.
Expand Down Expand Up @@ -113,9 +116,11 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())

if etcdCli != nil {
manager.cmdCli = client.NewEtcdCommandClient(etcdCli)
manager.cmdCli = client.NewCommandClient(etcdCli)
manager.notificationCli = client.NewNotificationClient(etcdCli)
} else {
manager.cmdCli = client.NewMockCommandClient()
manager.notificationCli = client.NewMockNotificationClient()
}

manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id)
Expand Down Expand Up @@ -150,6 +155,7 @@ func (m *JobManager) jobLoop() error {
checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())

cmdWatcher := m.cmdCli.WatchCommand(m.ctx)
scanTaskNotificationWatcher := m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType)
m.taskManager.resizeWorkersWithSysVar()
for {
m.reportMetrics()
Expand Down Expand Up @@ -208,6 +214,17 @@ func (m *JobManager) jobLoop() error {
// Task Manager Loop
case <-scheduleTaskTicker:
m.taskManager.rescheduleTasks(se, now)
case _, ok := <-scanTaskNotificationWatcher:
if !ok {
if m.ctx.Err() != nil {
return nil
}

logutil.BgLogger().Warn("The TTL scan task notification watcher is closed unexpectedly, re-watch it again")
scanTaskNotificationWatcher = m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType)
continue
}
m.taskManager.rescheduleTasks(se, now)
case <-taskCheckTicker:
m.taskManager.checkInvalidTask(se)
m.taskManager.checkFinishedTask(se, now)
Expand Down Expand Up @@ -611,10 +628,18 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return nil, err
}
return m.createNewJob(now, table)

job := m.createNewJob(now, table)

// job is created, notify every scan managers to fetch new tasks
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to trigger scan tasks", zap.Error(err))
}
return job, nil
}

func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) *ttlJob {
id := m.tableStatusCache.Tables[table.ID].CurrentJobID

return &ttlJob{
Expand All @@ -627,7 +652,7 @@ func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*t
tbl: table,

status: cache.JobStatusWaiting,
}, nil
}
}

// updateHeartBeat updates the heartbeat for all task with current instance as owner
Expand Down Expand Up @@ -687,6 +712,11 @@ func (m *JobManager) GetCommandCli() client.CommandClient {
return m.cmdCli
}

// GetNotificationCli returns the notification client
func (m *JobManager) GetNotificationCli() client.NotificationClient {
return m.notificationCli
}

type ttlSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
Expand Down
38 changes: 38 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,44 @@ func TestJobTimeout(t *testing.T) {
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}

func TestTriggerScanTask(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)

now := time.Now()

se := sessionFactory()
m := dom.TTLJobManager()
m.TaskManager().ResizeWorkersWithSysVar()
nCli := m.GetNotificationCli()

tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
require.NoError(t, m.InfoSchemaCache().Update(se))

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
<-nCli.WatchNotification(context.Background(), "scan")
wg.Done()
}()
m.RescheduleJobs(se, now)

// notification is sent
wg.Wait()

for time.Now().Before(now.Add(time.Second * 5)) {
time.Sleep(time.Second)
rows := tk.MustQuery("SELECT status FROM mysql.tidb_ttl_task").Rows()
if len(rows) == 0 {
break
}
if rows[0][0] == cache.TaskStatusFinished {
break
}
}
}

func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) {
maxWaitTime := 30
for {
Expand Down