Skip to content

Commit

Permalink
server: use leader lease to determine tso service validity #1676 (#2117)
Browse files Browse the repository at this point in the history
* server: use leader lease to determine tso service validity#1676

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Feb 14, 2020
1 parent 5cf2711 commit 1b67bbb
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 41 deletions.
9 changes: 7 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,14 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}
start := time.Now()
if err = s.validateRequest(request.GetHeader()); err != nil {
return err
// TSO uses leader lease to determine validity. No need to check leader here.
if s.isClosed() {
return status.Errorf(codes.Unknown, "server not started")
}
if request.GetHeader().GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}

count := request.GetCount()
ts, err := s.getRespTS(count)
if err != nil {
Expand Down
75 changes: 39 additions & 36 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import (
"go.uber.org/zap"
)

// The timeout to wait transfer etcd leader to complete.
const moveLeaderTimeout = 5 * time.Second
const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
leaderTickInterval = 50 * time.Millisecond
)

// IsLeader returns whether the server is leader or not.
func (s *Server) IsLeader() bool {
Expand Down Expand Up @@ -214,30 +217,22 @@ func (s *Server) memberInfo() (member *pdpb.Member, marshalStr string) {
func (s *Server) campaignLeader() error {
log.Info("start to campaign leader", zap.String("campaign-leader-name", s.Name()))

lessor := clientv3.NewLease(s.client)
lease := NewLeaderLease(s.client)
defer func() {
lessor.Close()
defer lease.Close()
log.Info("exit campaign leader")
}()

start := time.Now()
ctx, cancel := context.WithTimeout(s.client.Ctx(), requestTimeout)
leaseResp, err := lessor.Grant(ctx, s.cfg.LeaderLease)
cancel()

if cost := time.Since(start); cost > slowRequestTime {
log.Warn("lessor grants too slow", zap.Duration("cost", cost))
}

err := lease.Grant(s.cfg.LeaderLease)
if err != nil {
return errors.WithStack(err)
return err
}

leaderKey := s.getLeaderPath()
// The leader key must not exist, so the CreateRevision is 0.
resp, err := s.txn().
If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)).
Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(leaseResp.ID))).
Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(lease.ID))).
Commit()
if err != nil {
return errors.WithStack(err)
Expand All @@ -246,35 +241,40 @@ func (s *Server) campaignLeader() error {
return errors.New("failed to campaign leader, other server may campaign ok")
}

// Start keepalive and enable TSO service.
// TSO service is strictly enabled/disabled by leader lease for 2 reasons:
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
// 2. load region could be slow. Based on lease we can recover TSO service faster.
// Make the leader keepalived.
ctx, cancel = context.WithCancel(s.serverLoopCtx)
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()

ch, err := lessor.KeepAlive(ctx, leaseResp.ID)
if err != nil {
return errors.WithStack(err)
}
go lease.KeepAlive(ctx)
log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name()))

// sync timestamp.
log.Debug("sync timestamp for tso")
if err = s.syncTimestamp(lease); err != nil {
return err
}

defer s.ts.Store(&atomicObject{
physical: zeroTime,
})

// reload config.
err = s.reloadConfigFromKV()
if err != nil {
return err
}

// Try to create raft cluster.
err = s.createRaftCluster()
if err != nil {
return err
}
defer s.stopRaftCluster()

log.Debug("sync timestamp for tso")
if err = s.syncTimestamp(); err != nil {
return err
}
defer s.ts.Store(&atomicObject{
physical: zeroTime,
})

s.enableLeader()
defer s.disableLeader()

Expand All @@ -284,23 +284,26 @@ func (s *Server) campaignLeader() error {
tsTicker := time.NewTicker(updateTimestampStep)
defer tsTicker.Stop()

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()
for {
select {
case _, ok := <-ch:
if !ok {
log.Info("keep alive channel is closed")
case <-leaderTicker.C:
if lease.IsExpired() {
log.Info("lease expired, leader step down")
return nil
}
case <-tsTicker.C:
if err = s.updateTimestamp(); err != nil {
log.Info("failed to update timestamp")
return err
}
etcdLeader := s.GetEtcdLeader()
if etcdLeader != s.ID() {
log.Info("etcd leader changed, resigns leadership", zap.String("old-leader-name", s.Name()))
return nil
}
case <-tsTicker.C:
if err = s.updateTimestamp(); err != nil {
log.Info("failed to update timestamp")
return err
}

case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
Expand Down
140 changes: 140 additions & 0 deletions server/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// LeaderLease is used for renewing leadership of PD server.
type LeaderLease struct {
client *clientv3.Client
lease clientv3.Lease
ID clientv3.LeaseID
leaseTimeout time.Duration

expireTime atomic.Value
}

// NewLeaderLease creates a lease.
func NewLeaderLease(client *clientv3.Client) *LeaderLease {
return &LeaderLease{
client: client,
lease: clientv3.NewLease(client),
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *LeaderLease) Grant(leaseTimeout int64) error {
start := time.Now()
ctx, cancel := context.WithTimeout(l.client.Ctx(), requestTimeout)
leaseResp, err := l.lease.Grant(ctx, leaseTimeout)
cancel()
if err != nil {
return errors.WithStack(err)
}
if cost := time.Since(start); cost > slowRequestTime {
log.Warn("lease grants too slow", zap.Duration("cost", cost))
}
l.ID = leaseResp.ID
l.leaseTimeout = time.Duration(leaseTimeout) * time.Second
l.expireTime.Store(start.Add(time.Duration(leaseResp.TTL) * time.Second))
return nil
}

const revokeLeaseTimeout = time.Second

// Close releases the lease.
func (l *LeaderLease) Close() error {
// Reset expire time.
l.expireTime.Store(time.Time{})
// Try to revoke lease to make subsequent elections faster.
ctx, cancel := context.WithTimeout(l.client.Ctx(), revokeLeaseTimeout)
defer cancel()
l.lease.Revoke(ctx, l.ID)
return l.lease.Close()
}

// IsExpired checks if the lease is expired. If it returns true, current PD
// server should step down and try to re-elect again.
func (l *LeaderLease) IsExpired() bool {
return time.Now().After(l.expireTime.Load().(time.Time))
}

// KeepAlive auto renews the lease and update expireTime.
func (l *LeaderLease) KeepAlive(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)

var maxExpire time.Time
for {
select {
case t := <-timeCh:
if t.After(maxExpire) {
maxExpire = t
l.expireTime.Store(t)
}
case <-time.After(l.leaseTimeout):
return
case <-ctx.Done():
return
}
}
}

// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel.
func (l *LeaderLease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
ch := make(chan time.Time)

go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
go func() {
start := time.Now()
ctx1, cancel := context.WithTimeout(ctx, time.Duration(l.leaseTimeout))
defer cancel()
res, err := l.lease.KeepAliveOnce(ctx1, l.ID)
if err != nil {
log.Warn("leader lease keep alive failed", zap.Error(err))
return
}
if res.TTL > 0 {
expire := start.Add(time.Duration(res.TTL) * time.Second)
select {
case ch <- expire:
case <-ctx1.Done():
}
}
}()

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()

return ch
}
4 changes: 3 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ type Server struct {
// for raft cluster
cluster *RaftCluster
// For tso, set after pd becomes leader.
ts atomic.Value
ts atomic.Value
// lease is leadership of PD server.
lease *LeaderLease
lastSavedTime time.Time
// For async region heartbeat.
hbStreams *heartbeatStreams
Expand Down
6 changes: 5 additions & 1 deletion server/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *Server) saveTimestamp(ts time.Time) error {
return nil
}

func (s *Server) syncTimestamp() error {
func (s *Server) syncTimestamp(lease *LeaderLease) error {
tsoCounter.WithLabelValues("sync").Inc()

last, err := s.loadTimestamp()
Expand Down Expand Up @@ -107,6 +107,7 @@ func (s *Server) syncTimestamp() error {
current := &atomicObject{
physical: next,
}
s.lease = lease
s.ts.Store(current)

return nil
Expand Down Expand Up @@ -205,6 +206,9 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
time.Sleep(updateTimestampStep)
continue
}
if s.lease == nil || s.lease.IsExpired() {
return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired")
}
return resp, nil
}
return resp, errors.New("can not get timestamp")
Expand Down
2 changes: 1 addition & 1 deletion tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
regionLen = len(regions)

// ensure flush to region kv
time.Sleep(3 * time.Second)
time.Sleep(4 * time.Second)
err = leaderServer.Stop()
c.Assert(err, IsNil)
cluster.WaitLeader()
Expand Down

0 comments on commit 1b67bbb

Please sign in to comment.