diff --git a/DEPS.bzl b/DEPS.bzl index c9d495cb6e17e..6d80e6322c01c 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2915,8 +2915,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=", - version = "v0.0.0-20221114102356-3debb6820e46", + sum = "h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=", + version = "v0.0.0-20221117075110-51120697d051", ) go_repository( name = "com_github_pingcap_log", @@ -3519,8 +3519,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=", - version = "v2.0.3-0.20221108030801-9c0835c80eba", + sum = "h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=", + version = "v2.0.3-0.20221121025013-e9db9e6a8a94", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 9e438c32f0f1f..b41d5baf19528 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -163,6 +163,10 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge return resp, nil } +func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) { + return nil, nil +} + // RegionScan gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) { diff --git a/ddl/ddl.go b/ddl/ddl.go index 52a5b0480c42a..af8a0ca67a8d5 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -979,7 +979,6 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { // Instead, we merge all the jobs into one pending job. return appendToSubJobs(mci, job) } - // Get a global job ID and put the DDL job in the queue. setDDLJobQuery(ctx, job) task := &limitJobTask{job, make(chan error)} diff --git a/domain/globalconfigsync/globalconfig_test.go b/domain/globalconfigsync/globalconfig_test.go index c7dab0064dedb..d6013f2887693 100644 --- a/domain/globalconfigsync/globalconfig_test.go +++ b/domain/globalconfigsync/globalconfig_test.go @@ -88,19 +88,23 @@ func TestStoreGlobalConfig(t *testing.T) { _, err = se.Execute(context.Background(), "set @@global.tidb_enable_top_sql=1;") require.NoError(t, err) + _, err = se.Execute(context.Background(), "set @@global.tidb_source_id=2;") + require.NoError(t, err) for i := 0; i < 20; i++ { time.Sleep(100 * time.Millisecond) client := store.(kv.StorageWithPD).GetPDClient() // enable top sql will be translated to enable_resource_metering - items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering"}) + items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering", "source_id"}) require.NoError(t, err) - if len(items) == 1 && items[0].Value == "" { + if len(items) == 2 && items[0].Value == "" { continue } - require.Len(t, items, 1) + require.Len(t, items, 2) require.Equal(t, items[0].Name, "/global/config/enable_resource_metering") require.Equal(t, items[0].Value, "true") + require.Equal(t, items[1].Name, "/global/config/source_id") + require.Equal(t, items[1].Value, "2") return } require.Fail(t, "timeout for waiting global config synced") diff --git a/executor/set_test.go b/executor/set_test.go index a4a54a37a3595..734fdab8750fe 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -861,6 +861,15 @@ func TestSetVar(t *testing.T) { tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("4")) tk.MustExec("SET GLOBAL validate_password.mixed_case_count = 2") tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("6")) + + // test tidb_cdc_write_source + require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource) + tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("0")) + tk.MustExec("set @@session.tidb_cdc_write_source = 2") + tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("2")) + require.Equal(t, uint64(2), tk.Session().GetSessionVars().CDCWriteSource) + tk.MustExec("set @@session.tidb_cdc_write_source = 0") + require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource) } func TestGetSetNoopVars(t *testing.T) { diff --git a/go.mod b/go.mod index 2cd460e8279c3..56831b44d54ba 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46 + github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -86,7 +86,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba + github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index bdc73a0d74ddc..c1da1efba939c 100644 --- a/go.sum +++ b/go.sum @@ -778,8 +778,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46 h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4= -github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM= +github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -928,8 +928,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4= -github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba/go.mod h1:X9s4ct/MLk1sFqe5mU79KClKegLFDTa/FCx3hzexGtk= +github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94 h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI= +github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94/go.mod h1:mQQhAIZ2uJwWXOG2UEz9s9oLGRcNKGGGtDOk4b13Bos= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/kv/option.go b/kv/option.go index 888a1e24f0fa0..ee5354141cd7b 100644 --- a/kv/option.go +++ b/kv/option.go @@ -93,6 +93,8 @@ const ( ReplicaReadAdjuster // ScanBatchSize set the iter scan batch size. ScanBatchSize + // TxnSource set the source of this transaction. + TxnSource ) // ReplicaReadType is the type of replica to read data from diff --git a/session/session.go b/session/session.go index 51a3e22d39aac..cd2e5eced09b4 100644 --- a/session/session.go +++ b/session/session.go @@ -702,6 +702,7 @@ func (s *session) doCommit(ctx context.Context) error { if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 { s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) } + s.txn.SetOption(kv.TxnSource, sessVars.CDCWriteSource) if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 { c := cachedTableRenewLease{tables: tables} now := time.Now() diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b8fbcf54848e1..359baffdd9679 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1041,6 +1041,10 @@ type SessionVars struct { // MetricSchemaStep indicates the step when query metric schema. MetricSchemaStep int64 + + // CDCWriteSource indicates the following data is written by TiCDC if it is not 0. + CDCWriteSource uint64 + // MetricSchemaRangeDuration indicates the step when query metric schema. MetricSchemaRangeDuration int64 diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 060c542bddd77..0ba657405dd5e 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -300,6 +300,10 @@ var defaultSysVars = []*SysVar{ s.MetricSchemaStep = TidbOptInt64(val, DefTiDBMetricSchemaStep) return nil }}, + {Scope: ScopeSession, Name: TiDBCDCWriteSource, Value: "0", Type: TypeInt, MinValue: 0, MaxValue: 15, SetSession: func(s *SessionVars, val string) error { + s.CDCWriteSource = uint64(TidbOptInt(val, 0)) + return nil + }}, {Scope: ScopeSession, Name: TiDBMetricSchemaRangeDuration, Value: strconv.Itoa(DefTiDBMetricSchemaRangeDuration), skipInit: true, Type: TypeUnsigned, MinValue: 10, MaxValue: 60 * 60 * 60, SetSession: func(s *SessionVars, val string) error { s.MetricSchemaRangeDuration = TidbOptInt64(val, DefTiDBMetricSchemaRangeDuration) return nil @@ -776,6 +780,7 @@ var defaultSysVars = []*SysVar{ // TopSQL enable only be controlled by TopSQL pub/sub sinker. // This global variable only uses to update the global config which store in PD(ETCD). {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(topsqlstate.DefTiDBTopSQLEnable), Type: TypeBool, AllowEmpty: true, GlobalConfigName: GlobalConfigEnableTopSQL}, + {Scope: ScopeGlobal, Name: TiDBSourceID, Value: "1", Type: TypeInt, MinValue: 1, MaxValue: 15, GlobalConfigName: GlobalConfigSourceID}, {Scope: ScopeGlobal, Name: TiDBTopSQLMaxTimeSeriesCount, Value: strconv.Itoa(topsqlstate.DefTiDBTopSQLMaxTimeSeriesCount), Type: TypeInt, MinValue: 1, MaxValue: 5000, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { return strconv.FormatInt(topsqlstate.GlobalState.MaxStatementCount.Load(), 10), nil }, SetGlobal: func(_ context.Context, vars *SessionVars, s string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index a9e278107d270..55ec073d85e51 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -555,6 +555,9 @@ const ( // TiDBMetricSchemaStep indicates the step when query metric schema. TiDBMetricSchemaStep = "tidb_metric_query_step" + // TiDBCDCWriteSource indicates the following data is written by TiCDC if it is not 0. + TiDBCDCWriteSource = "tidb_cdc_write_source" + // TiDBMetricSchemaRangeDuration indicates the range duration when query metric schema. TiDBMetricSchemaRangeDuration = "tidb_metric_query_range_duration" @@ -627,6 +630,9 @@ const ( // TiDBEnableTopSQL indicates whether the top SQL is enabled. TiDBEnableTopSQL = "tidb_enable_top_sql" + // TiDBSourceID indicates the source ID of the TiDB server. + TiDBSourceID = "tidb_source_id" + // TiDBTopSQLMaxTimeSeriesCount indicates the max number of statements been collected in each time series. TiDBTopSQLMaxTimeSeriesCount = "tidb_top_sql_max_time_series_count" diff --git a/sessionctx/variable/variable.go b/sessionctx/variable/variable.go index 4b7faa09481c8..2792e373cdda1 100644 --- a/sessionctx/variable/variable.go +++ b/sessionctx/variable/variable.go @@ -85,6 +85,7 @@ const ( // Global config name list. const ( GlobalConfigEnableTopSQL = "enable_resource_metering" + GlobalConfigSourceID = "source_id" ) func (s ScopeFlag) String() string { diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 851e68eac89ef..b18b6d0db1f33 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -258,6 +258,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.SetRequestSourceType(val.(string)) case kv.ReplicaReadAdjuster: txn.KVTxn.GetSnapshot().SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster)) + case kv.TxnSource: + txn.KVTxn.SetTxnSource(val.(uint64)) } }