Skip to content

Commit

Permalink
*: add a memdb memory tracker (#39372)
Browse files Browse the repository at this point in the history
ref #35203
  • Loading branch information
ekexium committed Nov 25, 2022
1 parent c34ee82 commit 15b523c
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 49 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3519,8 +3519,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=",
version = "v2.0.3-0.20221121025013-e9db9e6a8a94",
sum = "h1:G44ccTqXvE3uZgA+8Y71RQmw/1gsst+wXtn2+qw5ykI=",
version = "v2.0.3-0.20221124031013-92f0a82e1a9f",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
8 changes: 1 addition & 7 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,14 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error {
}

func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handle, data []types.Datum) error {
txnState, err := e.ctx.Txn(false)
if err != nil {
return err
}
memUsageOfTxnState := txnState.Size()
err = t.RemoveRecord(ctx, h, data)
err := t.RemoveRecord(ctx, h, data)
if err != nil {
return err
}
err = e.onRemoveRowForFK(ctx, t, data)
if err != nil {
return err
}
e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState))
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6237,8 +6237,7 @@ func TestSessionRootTrackerDetach(t *testing.T) {
tk.MustExec("create table t(a int, b int, index idx(a))")
tk.MustExec("create table t1(a int, c int, index idx(a))")
tk.MustExec("set tidb_mem_quota_query=10")
err := tk.ExecToErr("select /*+hash_join(t1)*/ t.a, t1.a from t use index(idx), t1 use index(idx) where t.a = t1.a")
require.Contains(t, err.Error(), "Out Of Memory Quota!")
tk.MustContainErrMsg("select /*+hash_join(t1)*/ t.a, t1.a from t use index(idx), t1 use index(idx) where t.a = t1.a", "Out Of Memory Quota!")
tk.MustExec("set tidb_mem_quota_query=1000")
rs, err := tk.Exec("select /*+hash_join(t1)*/ t.a, t1.a from t use index(idx), t1 use index(idx) where t.a = t1.a")
require.NoError(t, err)
Expand Down
12 changes: 11 additions & 1 deletion executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2607,7 +2607,17 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
row = append(row, types.NewDatum(nil))
}
} else {
row = append(row, e.txnInfo[i].ToDatum(c.Name.O))
switch c.Name.O {
case txninfo.MemBufferBytesStr:
memDBFootprint := sctx.GetSessionVars().MemDBFootprint
var bytesConsumed int64
if memDBFootprint != nil {
bytesConsumed = memDBFootprint.BytesConsumed()
}
row = append(row, types.NewDatum(bytesConsumed))
default:
row = append(row, e.txnInfo[i].ToDatum(c.Name.O))
}
}
}
res = append(res, row)
Expand Down
2 changes: 0 additions & 2 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
setOptionForTopSQL(sessVars.StmtCtx, txn)
txnSize := txn.Size()
sessVars.StmtCtx.AddRecordRows(uint64(len(rows)))
// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
// For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in
Expand Down Expand Up @@ -113,7 +112,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
e.stats.CheckInsertTime += time.Since(start)
}
}
e.memTracker.Consume(int64(txn.Size() - txnSize))
return nil
}

Expand Down
6 changes: 2 additions & 4 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2307,16 +2307,14 @@ func TestIssue18070(t *testing.T) {
tk.MustExec("insert into t1 values(1),(2)")
tk.MustExec("insert into t2 values(1),(1),(2),(2)")
tk.MustExec("set @@tidb_mem_quota_query=1000")
err := tk.QueryToErr("select /*+ inl_hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a;")
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
tk.MustContainErrMsg("select /*+ inl_hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a;", "Out Of Memory Quota!")

fpName := "github.com/pingcap/tidb/executor/mockIndexMergeJoinOOMPanic"
require.NoError(t, failpoint.Enable(fpName, `panic("ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")`))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()
err = tk.QueryToErr("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;")
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
tk.MustContainErrMsg("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;", "Out Of Memory Quota!")
}

func TestIssue18564(t *testing.T) {
Expand Down
8 changes: 1 addition & 7 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
txn, err := sctx.Txn(false)
if err != nil {
return false, err
}
memUsageOfTxnState := txn.Size()
defer memTracker.Consume(int64(txn.Size() - memUsageOfTxnState))
sc := sctx.GetSessionVars().StmtCtx
changed, handleChanged := false, false
// onUpdateSpecified is for "UPDATE SET ts_field = old_value", the
Expand Down Expand Up @@ -207,7 +201,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
}
} else {
// Update record to new value and update index.
if err = t.UpdateRecord(ctx, sctx, h, oldData, newData, modified); err != nil {
if err := t.UpdateRecord(ctx, sctx, h, oldData, newData, modified); err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
return false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94
github.com/tikv/client-go/v2 v2.0.3-0.20221124031013-92f0a82e1a9f
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -928,8 +928,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94 h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94/go.mod h1:mQQhAIZ2uJwWXOG2UEz9s9oLGRcNKGGGtDOk4b13Bos=
github.com/tikv/client-go/v2 v2.0.3-0.20221124031013-92f0a82e1a9f h1:G44ccTqXvE3uZgA+8Y71RQmw/1gsst+wXtn2+qw5ykI=
github.com/tikv/client-go/v2 v2.0.3-0.20221124031013-92f0a82e1a9f/go.mod h1:mQQhAIZ2uJwWXOG2UEz9s9oLGRcNKGGGtDOk4b13Bos=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
11 changes: 7 additions & 4 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,12 +588,12 @@ INSERT INTO ...;
defer func() { require.NoError(t, os.Remove(slowLogFileName)) }()
tk := testkit.NewTestKit(t, store)

//check schema
// check schema
tk.MustQuery(`select COUNT(*) from information_schema.columns
WHERE table_name = 'slow_query' and column_name = '` + columnName + `'`).
Check(testkit.Rows("1"))

//check select
// check select
tk.MustQuery(`select ` + columnName +
` from information_schema.slow_query`).Check(testkit.Rows("1"))
}
Expand Down Expand Up @@ -1393,16 +1393,19 @@ func TestTiDBTrx(t *testing.T) {
tk.MustExec("update test_tidb_trx set i = i + 1")
_, digest := parser.NormalizeDigest("update test_tidb_trx set i = i + 1")
sm := &testkit.MockSessionManager{TxnInfo: make([]*txninfo.TxnInfo, 2)}
memDBTracker := memory.NewTracker(memory.LabelForMemDB, -1)
memDBTracker.Consume(19)
tk.Session().GetSessionVars().MemDBFootprint = memDBTracker
sm.TxnInfo[0] = &txninfo.TxnInfo{
StartTS: 424768545227014155,
CurrentSQLDigest: digest.String(),
State: txninfo.TxnIdle,
EntriesCount: 1,
EntriesSize: 19,
ConnectionID: 2,
Username: "root",
CurrentDB: "test",
}

blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local)
sm.TxnInfo[1] = &txninfo.TxnInfo{
StartTS: 425070846483628033,
Expand All @@ -1419,7 +1422,7 @@ func TestTiDBTrx(t *testing.T) {

tk.MustQuery("select * from information_schema.TIDB_TRX;").Check(testkit.Rows(
"424768545227014155 2021-05-07 12:56:48.001000 "+digest.String()+" update `test_tidb_trx` set `i` = `i` + ? Idle <nil> 1 19 2 root test [] ",
"425070846483628033 2021-05-20 21:16:35.778000 <nil> <nil> LockWaiting 2021-05-20 13:18:30.123456 0 0 10 user1 db1 [\"sql1\",\"sql2\",\""+digest.String()+"\"] "))
"425070846483628033 2021-05-20 21:16:35.778000 <nil> <nil> LockWaiting 2021-05-20 13:18:30.123456 0 19 10 user1 db1 [\"sql1\",\"sql2\",\""+digest.String()+"\"] "))

// Test the all_sql_digests column can be directly passed to the tidb_decode_sql_digests function.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/sqlDigestRetrieverSkipRetrieveGlobal", "return"))
Expand Down
8 changes: 8 additions & 0 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func (t *mockTxn) UpdateMemBufferFlags(_ []byte, _ ...FlagsOp) {

}

func (t *mockTxn) SetMemoryFootprintChangeHook(func(uint64)) {

}

func (t *mockTxn) Mem() uint64 {
return 0
}

// newMockTxn new a mockTxn.
func newMockTxn() Transaction {
return &mockTxn{
Expand Down
4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ type Transaction interface {
AssertionProto
// Size returns sum of keys and values length.
Size() int
// Mem returns the memory consumption of the transaction.
Mem() uint64
// SetMemoryFootprintChangeHook sets the hook that will be called when the memory footprint changes.
SetMemoryFootprintChangeHook(func(uint64))
// Len returns the number of entries in the DB.
Len() int
// Reset reset the Transaction to initial states.
Expand Down
17 changes: 17 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import (
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/logutil/consistency"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -569,6 +570,7 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) {
return fields, nil
}

// TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only*
func (s *session) TxnInfo() *txninfo.TxnInfo {
s.txn.mu.RLock()
// Copy on read to get a snapshot, this API shouldn't be frequently called.
Expand Down Expand Up @@ -2507,6 +2509,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
return &s.txn, nil
}
_, err := sessiontxn.GetTxnManager(s).ActivateTxn()
s.SetMemoryFootprintChangeHook()
return &s.txn, err
}

Expand Down Expand Up @@ -3660,6 +3663,20 @@ func (s *session) GetStmtStats() *stmtstats.StatementStats {
return s.stmtStats
}

// SetMemoryFootprintChangeHook sets the hook that is called when the memdb changes its size.
// Call this after s.txn becomes valid, since TxnInfo is initialized when the txn becomes valid.
func (s *session) SetMemoryFootprintChangeHook() {
hook := func(mem uint64) {
if s.sessionVars.MemDBFootprint == nil {
tracker := memory.NewTracker(memory.LabelForMemDB, -1)
tracker.AttachTo(s.sessionVars.MemTracker)
s.sessionVars.MemDBFootprint = tracker
}
s.sessionVars.MemDBFootprint.ReplaceBytesUsed(int64(mem))
}
s.txn.SetMemoryFootprintChangeHook(hook)
}

// EncodeSessionStates implements SessionStatesHandler.EncodeSessionStates interface.
func (s *session) EncodeSessionStates(ctx context.Context, sctx sessionctx.Context, sessionStates *sessionstates.SessionStates) error {
// Transaction status is hard to encode, so we do not support it.
Expand Down
24 changes: 18 additions & 6 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,14 @@ func (txn *LazyTxn) cleanupStmtBuf() {
txn.mu.Lock()
defer txn.mu.Unlock()
txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len())
txn.mu.TxnInfo.EntriesSize = uint64(txn.Transaction.Size())
}

// resetTxnInfo resets the transaction info.
// Note: call it under lock!
func (txn *LazyTxn) resetTxnInfo(
startTS uint64,
state txninfo.TxnRunningState,
entriesCount,
entriesSize uint64,
entriesCount uint64,
currentSQLDigest string,
allSQLDigests []string,
) {
Expand All @@ -178,7 +176,7 @@ func (txn *LazyTxn) resetTxnInfo(
txninfo.TxnStatusEnteringCounter(state).Inc()
txn.mu.TxnInfo.LastStateChangeTime = time.Now()
txn.mu.TxnInfo.EntriesCount = entriesCount
txn.mu.TxnInfo.EntriesSize = entriesSize

txn.mu.TxnInfo.CurrentSQLDigest = currentSQLDigest
txn.mu.TxnInfo.AllSQLDigests = allSQLDigests
}
Expand All @@ -191,6 +189,22 @@ func (txn *LazyTxn) Size() int {
return txn.Transaction.Size()
}

// Mem implements the MemBuffer interface.
func (txn *LazyTxn) Mem() uint64 {
if txn.Transaction == nil {
return 0
}
return txn.Transaction.Mem()
}

// SetMemoryFootprintChangeHook sets the hook to be called when the memory footprint of this transaction changes.
func (txn *LazyTxn) SetMemoryFootprintChangeHook(hook func(uint64)) {
if txn.Transaction == nil {
return
}
txn.Transaction.SetMemoryFootprintChangeHook(hook)
}

// Valid implements the kv.Transaction interface.
func (txn *LazyTxn) Valid() bool {
return txn.Transaction != nil && txn.Transaction.Valid()
Expand Down Expand Up @@ -275,7 +289,6 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error {
t.StartTS(),
txninfo.TxnIdle,
uint64(txn.Transaction.Len()),
uint64(txn.Transaction.Size()),
txn.mu.TxnInfo.CurrentSQLDigest,
txn.mu.TxnInfo.AllSQLDigests)

Expand Down Expand Up @@ -433,7 +446,6 @@ func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...k
txn.updateState(originState)
txn.mu.TxnInfo.BlockStartTime.Valid = false
txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len())
txn.mu.TxnInfo.EntriesSize = uint64(txn.Transaction.Size())
return err
}

Expand Down
5 changes: 0 additions & 5 deletions session/txninfo/txn_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ type TxnInfo struct {
}
// How many entries are in MemDB
EntriesCount uint64
// MemDB used memory
EntriesSize uint64

// The following fields will be filled in `session` instead of `LazyTxn`

Expand Down Expand Up @@ -208,9 +206,6 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
MemBufferKeysStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.EntriesCount)
},
MemBufferBytesStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.EntriesSize)
},
SessionIDStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.ConnectionID)
},
Expand Down
6 changes: 4 additions & 2 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,8 +1294,10 @@ type SessionVars struct {
HookContext

// MemTracker indicates the memory tracker of current session.
MemTracker *memory.Tracker
DiskTracker *memory.Tracker
MemTracker *memory.Tracker
// MemDBDBFootprint tracks the memory footprint of memdb, and is attached to `MemTracker`
MemDBFootprint *memory.Tracker
DiskTracker *memory.Tracker

// OptPrefixIndexSingleScan indicates whether to do some optimizations to avoid double scan for prefix index.
// When set to true, `col is (not) null`(`col` is index prefix column) is regarded as index filter rather than table filter.
Expand Down
4 changes: 4 additions & 0 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {

sessVars := p.sctx.GetSessionVars()
sessVars.TxnCtx.StartTS = txn.StartTS()
if sessVars.MemDBFootprint != nil {
sessVars.MemDBFootprint.Detach()
}
sessVars.MemDBFootprint = nil

if p.enterNewTxnType == sessiontxn.EnterNewTxnBeforeStmt && !sessVars.IsAutocommit() && sessVars.SnapshotTS == 0 {
sessVars.SetInTxn(true)
Expand Down
20 changes: 18 additions & 2 deletions tests/realtikvtest/txntest/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,30 @@ func TestEntriesCountAndSize(t *testing.T) {
tk.MustExec("insert into t(a) values (1);")
info := tk.Session().TxnInfo()
require.Equal(t, uint64(1), info.EntriesCount)
require.Equal(t, uint64(29), info.EntriesSize)
tk.MustExec("insert into t(a) values (2);")
info = tk.Session().TxnInfo()
require.Equal(t, uint64(2), info.EntriesCount)
require.Equal(t, uint64(58), info.EntriesSize)
tk.MustExec("commit;")
}

func TestMemDBTracker(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
session := tk.Session()
tk.MustExec("use test")
tk.MustExec("create table t (id int)")
tk.MustExec("begin")
for i := 0; i < (1 << 10); i++ {
tk.MustExec("insert t (id) values (1)")
}
require.Less(t, int64(1<<(10+4)), session.GetSessionVars().MemDBFootprint.BytesConsumed())
require.Greater(t, int64(1<<(14+4)), session.GetSessionVars().MemDBFootprint.BytesConsumed())
for i := 0; i < (1 << 14); i++ {
tk.MustExec("insert t (id) values (1)")
}
require.Less(t, int64(1<<(14+4)), session.GetSessionVars().MemDBFootprint.BytesConsumed())
}

func TestRunning(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)

Expand Down
Loading

0 comments on commit 15b523c

Please sign in to comment.