Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: recycle idle connection in tikv client (#10616) #10632

Merged
merged 3 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ type connArray struct {
batchCommandsCh chan *batchCommandsEntry
batchCommandsClients []*batchCommandsClient
tikvTransportLayerLoad uint64

// Notify rpcClient to check the idle flag
idleNotify *uint32
idle bool
idleDetect *time.Timer
}

type batchCommandsClient struct {
Expand Down Expand Up @@ -132,7 +137,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
// When `conn.Close()` is called, `client.Recv()` will return an error.
resp, err := c.client.Recv()
if err != nil {

now := time.Now()
for { // try to re-create the streaming in the loop.
if c.isStopped() {
Expand Down Expand Up @@ -198,8 +202,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
}
}

func newConnArray(maxSize uint, addr string, security config.Security) (*connArray, error) {
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) {
cfg := config.GetGlobalConfig()

a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
Expand All @@ -208,6 +213,9 @@ func newConnArray(maxSize uint, addr string, security config.Security) (*connArr
batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize),
batchCommandsClients: make([]*batchCommandsClient, 0, maxSize),
tikvTransportLayerLoad: 0,

idleNotify: idleNotify,
idleDetect: time.NewTimer(idleTimeout),
}
if err := a.Init(addr, security); err != nil {
return nil, err
Expand Down Expand Up @@ -332,15 +340,29 @@ func (b *batchCommandsEntry) isCanceled() bool {
return atomic.LoadInt32(&b.canceled) == 1
}

const idleTimeout = 3 * time.Minute

// fetchAllPendingRequests fetches all pending requests from the channel.
func fetchAllPendingRequests(
ch chan *batchCommandsEntry,
func (a *connArray) fetchAllPendingRequests(
maxBatchSize int,
entries *[]*batchCommandsEntry,
requests *[]*tikvpb.BatchCommandsRequest_Request,
) {
// Block on the first element.
headEntry := <-ch
var headEntry *batchCommandsEntry
select {
case headEntry = <-a.batchCommandsCh:
if !a.idleDetect.Stop() {
<-a.idleDetect.C
}
a.idleDetect.Reset(idleTimeout)
case <-a.idleDetect.C:
a.idleDetect.Reset(idleTimeout)
a.idle = true
atomic.CompareAndSwapUint32(a.idleNotify, 0, 1)
// This connArray to be recycled
return
}
if headEntry == nil {
return
}
Expand All @@ -350,7 +372,7 @@ func fetchAllPendingRequests(
// This loop is for trying best to collect more requests.
for len(*entries) < maxBatchSize {
select {
case entry := <-ch:
case entry := <-a.batchCommandsCh:
if entry == nil {
return
}
Expand Down Expand Up @@ -435,7 +457,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requestIDs = requestIDs[:0]

metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh)))
fetchAllPendingRequests(a.batchCommandsCh, int(cfg.MaxBatchSize), &entries, &requests)
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad)
Expand Down Expand Up @@ -513,13 +535,15 @@ func removeCanceledRequests(
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
// Since we use shared client connection to communicate to the same TiKV, it's possible
// that there are too many concurrent requests which overload the service of TiKV.
// TODO: Implement background cleanup. It adds a background goroutine to periodically check
// whether there is any connection is idle and then close and remove these idle connections.
type rpcClient struct {
sync.RWMutex
isClosed bool
conns map[string]*connArray
security config.Security

// Implement background cleanup.
// Periodically check whether there is any connection that is idle and then close and remove these idle connections.
idleNotify uint32
}

func newRPCClient(security config.Security) *rpcClient {
Expand Down Expand Up @@ -554,7 +578,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
if !ok {
var err error
connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
array, err = newConnArray(connCount, addr, c.security)
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -619,6 +643,10 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID).Observe(time.Since(start).Seconds())
}()

if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) {
c.recycleIdleConnArray()
}

connArray, err := c.getConnArray(addr)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -679,3 +707,28 @@ func (c *rpcClient) Close() error {
c.closeConns()
return nil
}

func (c *rpcClient) recycleIdleConnArray() {
var addrs []string
c.RLock()
for _, conn := range c.conns {
if conn.idle {
addrs = append(addrs, conn.target)
}
}
c.RUnlock()

for _, addr := range addrs {
c.Lock()
conn, ok := c.conns[addr]
if ok {
delete(c.conns, addr)
logutil.Logger(context.Background()).Info("recycle idle connection",
zap.String("target", addr))
}
c.Unlock()
if conn != nil {
conn.Close()
}
}
}
2 changes: 2 additions & 0 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (s *testClientSuite) TestConn(c *C) {
c.Assert(err, IsNil)
c.Assert(conn2.Get(), Not(Equals), conn1.Get())

client.recycleIdleConnArray()

client.Close()
conn3, err := client.getConnArray(addr)
c.Assert(err, NotNil)
Expand Down