Skip to content

Commit

Permalink
server: fix memtracker leak with cursor (#44257) (#44280)
Browse files Browse the repository at this point in the history
close #44254
  • Loading branch information
YangKeao committed May 30, 2023
1 parent 8362916 commit b797290
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
18 changes: 10 additions & 8 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,19 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
}
execStmt.SetText(charset.EncodingUTF8Impl, sql)
rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt)
if rs != nil {
defer terror.Call(rs.Close)
}
if err != nil {
// If error is returned during the planner phase or the executor.Open
// phase, the rs will be nil, and StmtCtx.MemTracker StmtCtx.DiskTracker
// will not be detached. We need to detach them manually.
if sv := cc.ctx.GetSessionVars(); sv != nil && sv.StmtCtx != nil {
sv.StmtCtx.DetachMemDiskTracker()
}
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}

if rs == nil {
if useCursor {
vars.SetStatusFlag(mysql.ServerStatusCursorExists, false)
Expand Down Expand Up @@ -331,13 +341,6 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
cl.OnFetchReturned()
}

// as the `Next` of `ResultSet` will never be called, all rows have been cached inside it. We could close this
// `ResultSet`.
err = rs.Close()
if err != nil {
return false, err
}

stmt.SetCursorActive(true)

// explicitly flush columnInfo to client.
Expand All @@ -348,7 +351,6 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm

return false, cc.flush(ctx)
}
defer terror.Call(rs.Close)
retryable, err := cc.writeResultset(ctx, rs, true, cc.ctx.Status(), 0)
if err != nil {
return retryable, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
Expand Down
53 changes: 53 additions & 0 deletions server/conn_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -372,3 +373,55 @@ func TestCursorWithParams(t *testing.T) {
0x0, 0x1, 0x0, 0x0,
)))
}

func TestCursorDetachMemTracker(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
srv.SetDomain(dom)
defer srv.Close()

appendUint32 := binary.LittleEndian.AppendUint32
ctx := context.Background()
c := CreateMockConn(t, srv).(*mockConn)

tk := testkit.NewTestKitWithSession(t, store, c.Context().Session)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id_1 int, id_2 int)")
tk.MustExec("insert into t values (1, 1), (1, 2)")
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
defer tk.MustExec("set global tidb_mem_oom_action= DEFAULT")
// TODO: find whether it's expected to have one child at the beginning
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)

// execute a normal statement, it'll success
stmt, _, _, err := c.Context().Prepare("select count(id_2) from t")
require.NoError(t, err)

require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
maxConsumed := tk.Session().GetSessionVars().MemTracker.MaxConsumed()

// testkit also uses `PREPARE` related calls to run statement with arguments.
// format the SQL to avoid the interference from testkit.
tk.MustExec(fmt.Sprintf("set tidb_mem_quota_query=%d", maxConsumed/2))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)
// This query should exceed the memory limitation during `openExecutor`
require.Error(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)

// The next query should succeed
tk.MustExec(fmt.Sprintf("set tidb_mem_quota_query=%d", maxConsumed+1))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)
// This query should succeed
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
require.Len(t, tk.Session().GetSessionVars().MemTracker.GetChildrenForTest(), 1)
}

0 comments on commit b797290

Please sign in to comment.