Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

variable: add txn_source into kv.context #39159

Merged
merged 7 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=",
version = "v0.0.0-20221114102356-3debb6820e46",
sum = "h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=",
version = "v0.0.0-20221117075110-51120697d051",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3519,8 +3519,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=",
version = "v2.0.3-0.20221108030801-9c0835c80eba",
sum = "h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=",
version = "v2.0.3-0.20221121025013-e9db9e6a8a94",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
return resp, nil
}

func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) {
return nil, nil
}

// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,6 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
}

// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
task := &limitJobTask{job, make(chan error)}
Expand Down
10 changes: 7 additions & 3 deletions domain/globalconfigsync/globalconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,23 @@ func TestStoreGlobalConfig(t *testing.T) {

_, err = se.Execute(context.Background(), "set @@global.tidb_enable_top_sql=1;")
require.NoError(t, err)
_, err = se.Execute(context.Background(), "set @@global.tidb_source_id=2;")
require.NoError(t, err)
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
client :=
store.(kv.StorageWithPD).GetPDClient()
// enable top sql will be translated to enable_resource_metering
items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering"})
items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering", "source_id"})
require.NoError(t, err)
if len(items) == 1 && items[0].Value == "" {
if len(items) == 2 && items[0].Value == "" {
continue
}
require.Len(t, items, 1)
require.Len(t, items, 2)
require.Equal(t, items[0].Name, "/global/config/enable_resource_metering")
require.Equal(t, items[0].Value, "true")
require.Equal(t, items[1].Name, "/global/config/source_id")
require.Equal(t, items[1].Value, "2")
return
}
require.Fail(t, "timeout for waiting global config synced")
Expand Down
9 changes: 9 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,15 @@ func TestSetVar(t *testing.T) {
tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("4"))
tk.MustExec("SET GLOBAL validate_password.mixed_case_count = 2")
tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("6"))

// test tidb_cdc_write_source
require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)
tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("0"))
tk.MustExec("set @@session.tidb_cdc_write_source = 2")
tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("2"))
require.Equal(t, uint64(2), tk.Session().GetSessionVars().CDCWriteSource)
tk.MustExec("set @@session.tidb_cdc_write_source = 0")
require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)
}

func TestGetSetNoopVars(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -86,7 +86,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46 h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=
github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -928,8 +928,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=
github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba/go.mod h1:X9s4ct/MLk1sFqe5mU79KClKegLFDTa/FCx3hzexGtk=
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94 h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94/go.mod h1:mQQhAIZ2uJwWXOG2UEz9s9oLGRcNKGGGtDOk4b13Bos=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
2 changes: 2 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ const (
ReplicaReadAdjuster
// ScanBatchSize set the iter scan batch size.
ScanBatchSize
// TxnSource set the source of this transaction.
TxnSource
)

// ReplicaReadType is the type of replica to read data from
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ func (s *session) doCommit(ctx context.Context) error {
if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 {
s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables))
}
s.txn.SetOption(kv.TxnSource, sessVars.CDCWriteSource)
if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 {
c := cachedTableRenewLease{tables: tables}
now := time.Now()
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,10 @@ type SessionVars struct {

// MetricSchemaStep indicates the step when query metric schema.
MetricSchemaStep int64

// CDCWriteSource indicates the following data is written by TiCDC if it is not 0.
CDCWriteSource uint64

// MetricSchemaRangeDuration indicates the step when query metric schema.
MetricSchemaRangeDuration int64

Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ var defaultSysVars = []*SysVar{
s.MetricSchemaStep = TidbOptInt64(val, DefTiDBMetricSchemaStep)
return nil
}},
{Scope: ScopeSession, Name: TiDBCDCWriteSource, Value: "0", Type: TypeInt, MinValue: 0, MaxValue: 15, SetSession: func(s *SessionVars, val string) error {
s.CDCWriteSource = uint64(TidbOptInt(val, 0))
return nil
}},
{Scope: ScopeSession, Name: TiDBMetricSchemaRangeDuration, Value: strconv.Itoa(DefTiDBMetricSchemaRangeDuration), skipInit: true, Type: TypeUnsigned, MinValue: 10, MaxValue: 60 * 60 * 60, SetSession: func(s *SessionVars, val string) error {
s.MetricSchemaRangeDuration = TidbOptInt64(val, DefTiDBMetricSchemaRangeDuration)
return nil
Expand Down Expand Up @@ -776,6 +780,7 @@ var defaultSysVars = []*SysVar{
// TopSQL enable only be controlled by TopSQL pub/sub sinker.
// This global variable only uses to update the global config which store in PD(ETCD).
{Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(topsqlstate.DefTiDBTopSQLEnable), Type: TypeBool, AllowEmpty: true, GlobalConfigName: GlobalConfigEnableTopSQL},
{Scope: ScopeGlobal, Name: TiDBSourceID, Value: "1", Type: TypeInt, MinValue: 1, MaxValue: 15, GlobalConfigName: GlobalConfigSourceID},
{Scope: ScopeGlobal, Name: TiDBTopSQLMaxTimeSeriesCount, Value: strconv.Itoa(topsqlstate.DefTiDBTopSQLMaxTimeSeriesCount), Type: TypeInt, MinValue: 1, MaxValue: 5000, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
return strconv.FormatInt(topsqlstate.GlobalState.MaxStatementCount.Load(), 10), nil
}, SetGlobal: func(_ context.Context, vars *SessionVars, s string) error {
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ const (
// TiDBMetricSchemaStep indicates the step when query metric schema.
TiDBMetricSchemaStep = "tidb_metric_query_step"

// TiDBCDCWriteSource indicates the following data is written by TiCDC if it is not 0.
TiDBCDCWriteSource = "tidb_cdc_write_source"

// TiDBMetricSchemaRangeDuration indicates the range duration when query metric schema.
TiDBMetricSchemaRangeDuration = "tidb_metric_query_range_duration"

Expand Down Expand Up @@ -627,6 +630,9 @@ const (
// TiDBEnableTopSQL indicates whether the top SQL is enabled.
TiDBEnableTopSQL = "tidb_enable_top_sql"

// TiDBSourceID indicates the source ID of the TiDB server.
TiDBSourceID = "tidb_source_id"

// TiDBTopSQLMaxTimeSeriesCount indicates the max number of statements been collected in each time series.
TiDBTopSQLMaxTimeSeriesCount = "tidb_top_sql_max_time_series_count"

Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
// Global config name list.
const (
GlobalConfigEnableTopSQL = "enable_resource_metering"
GlobalConfigSourceID = "source_id"
)

func (s ScopeFlag) String() string {
Expand Down
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.KVTxn.SetRequestSourceType(val.(string))
case kv.ReplicaReadAdjuster:
txn.KVTxn.GetSnapshot().SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster))
case kv.TxnSource:
txn.KVTxn.SetTxnSource(val.(uint64))
}
}

Expand Down