Skip to content

Commit

Permalink
Fix data race when calling get metadata (#562)
Browse files Browse the repository at this point in the history
* Fix bug stuck in propose

The leader after a restart would grab the leader token and wait for a non-empty batch.
In the new test the view thread of the leader after the restart is just about to call controller decide.
However, before the provided fix, the controller is busy waiting for a new batch.
Without any new transactions, the leader is stuck.
In the fix, if the new batch is empty, the controller continues while reacquiring the leader token, thus allowing the view to call decide.

Signed-off-by: Фёдор Партанский <pfi79@mail.ru>

* Fix data race when calling GetMetadata.
The leader after a restart would grab the leader token and propose while also calling GetMetadata.
In the test the view thread of the leader after the restart is just about to call startNextSeq.
This causes a data race.
In the fix, the leader token is acquired only when the view is not in the middle of a decisionFix data race when calling GetMetadata.

Signed-off-by: Фёдор Партанский <pfi79@mail.ru>

---------

Signed-off-by: Фёдор Партанский <pfi79@mail.ru>
Co-authored-by: Hagar Meir <hagar.meir@ibm.com>
  • Loading branch information
pfi79 and HagarMeir authored Aug 26, 2023
1 parent 2a0557b commit cd14de6
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 12 deletions.
15 changes: 8 additions & 7 deletions internal/bft/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Proposer interface {
//
//go:generate mockery -dir . -name ProposerBuilder -case underscore -output ./mocks/
type ProposerBuilder interface {
NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) Proposer
NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) (Proposer, Phase)
}

// Controller controls the entire flow of the consensus
Expand Down Expand Up @@ -373,7 +373,7 @@ func (c *Controller) convertViewMessageToHeartbeat(m *protos.Message) *protos.Me
}

func (c *Controller) startView(proposalSequence uint64) {
view := c.ProposerBuilder.NewProposer(c.leaderID(), proposalSequence, c.currViewNumber, c.currDecisionsInView, c.quorum)
view, initPhase := c.ProposerBuilder.NewProposer(c.leaderID(), proposalSequence, c.currViewNumber, c.currDecisionsInView, c.quorum)

c.currViewLock.Lock()
c.currView = view
Expand All @@ -383,6 +383,12 @@ func (c *Controller) startView(proposalSequence uint64) {
role := Follower
leader, _ := c.iAmTheLeader()
if leader {
if initPhase == COMMITTED || initPhase == ABORT {
c.Logger.Debugf("Acquiring leader token when starting view with phase %s", initPhase.String())
c.acquireLeaderToken()
} else {
c.Logger.Debugf("Not acquiring leader token when starting view with phase %s", initPhase.String())
}
role = Leader
}
c.LeaderMonitor.ChangeRole(role, c.currViewNumber, c.leaderID())
Expand Down Expand Up @@ -414,10 +420,8 @@ func (c *Controller) changeView(newViewNumber uint64, newProposalSequence uint64
c.Logger.Debugf("Starting view after setting decisions in view to %d", newDecisionsInView)
c.startView(newProposalSequence)

// If I'm the leader, I can claim the leader token.
if iAm, _ := c.iAmTheLeader(); iAm {
c.Batcher.Reset()
c.acquireLeaderToken()
}
}

Expand Down Expand Up @@ -789,9 +793,6 @@ func (c *Controller) Start(startViewNumber uint64, startProposalSequence uint64,
c.currViewNumber = startViewNumber
c.currDecisionsInView = startDecisionsInView
c.startView(startProposalSequence)
if iAm, _ := c.iAmTheLeader(); iAm {
c.acquireLeaderToken()
}

go func() {
defer c.controllerDone.Done()
Expand Down
2 changes: 1 addition & 1 deletion internal/bft/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func configureProposerBuilder(controller *bft.Controller) *atomic.Value {
pb.On("NewProposer", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(func(a uint64, b uint64, c uint64, d uint64, e int) bft.Proposer {
return createView(controller, a, b, c, d, e, vs)
})
}, bft.Phase(bft.COMMITTED))
controller.ProposerBuilder = pb
return vs
}
Expand Down
13 changes: 11 additions & 2 deletions internal/bft/mocks/proposer_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/bft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ type ProposalMaker struct {
}

// NewProposer returns a new view
func (pm *ProposalMaker) NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) Proposer {
func (pm *ProposalMaker) NewProposer(leader, proposalSequence, viewNum, decisionsInView uint64, quorumSize int) (proposer Proposer, phase Phase) {
view := &View{
RetrieveCheckpoint: pm.Checkpoint.Get,
DecisionsPerLeader: pm.DecisionsPerLeader,
Expand Down Expand Up @@ -330,7 +330,7 @@ func (pm *ProposalMaker) NewProposer(leader, proposalSequence, viewNum, decision
view.MetricsView.DecisionsInView.Set(float64(view.DecisionsInView))
view.MetricsView.Phase.Set(float64(view.Phase))

return view
return view, view.Phase
}

// ViewSequence indicates if a view is currently active and its current proposal sequence
Expand Down
17 changes: 17 additions & 0 deletions internal/bft/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ const (
ABORT
)

func (p Phase) String() string {
switch p {
case COMMITTED:
return "COMMITTED"
case PROPOSED:
return "PROPOSED"
case PREPARED:
return "PREPARED"
case ABORT:
return "ABORT"
default:
return "Invalid Phase"
}
}

// State can save and restore the state
//
//go:generate mockery -dir . -name State -case underscore -output ./mocks/
Expand Down Expand Up @@ -885,6 +900,8 @@ func (v *View) GetMetadata() []byte {
DecisionsInView: v.DecisionsInView,
}

v.Logger.Debugf("GetMetadata with view %d, seq %d, dec %d", metadata.ViewId, metadata.LatestSequence, metadata.DecisionsInView)

var (
prevSigs []*protos.Signature
prevProp *protos.Proposal
Expand Down
79 changes: 79 additions & 0 deletions test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,85 @@ func TestLeaderCatchUpWithoutSync(t *testing.T) {
}
}

func TestLeaderProposeAfterRestartWithoutSync(t *testing.T) {
t.Parallel()
network := NewNetwork()
defer network.Shutdown()

testDir, err := os.MkdirTemp("", t.Name())
assert.NoErrorf(t, err, "generate temporary test dir")
defer os.RemoveAll(testDir)

numberOfNodes := 4
nodes := make([]*App, 0)
for i := 1; i <= numberOfNodes; i++ {
n := newNode(uint64(i), network, t.Name(), testDir, false, 0)
n.Consensus.Config.SyncOnStart = false
nodes = append(nodes, n)
}

restartWG := sync.WaitGroup{}
restartWG.Add(1)

restoredWG := sync.WaitGroup{}
restoredWG.Add(1)

contViewWG := sync.WaitGroup{}
contViewWG.Add(2)

baseLogger := nodes[0].logger.Desugar()
nodes[0].logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "Processed prepares for proposal with seq 1") {
restartWG.Done()
}
if strings.Contains(entry.Message, "Restored proposal with sequence 1") {
restoredWG.Done()
}
if strings.Contains(entry.Message, "processed commits for proposal with seq 1") {
contViewWG.Wait()
}
if strings.Contains(entry.Message, "GetMetadata with view 0, seq 1") {
contViewWG.Done()
}
if strings.Contains(entry.Message, "Not acquiring leader token when starting view with phase PREPARED") {
contViewWG.Done()
}
if strings.Contains(entry.Message, "Expected proposal sequence 2 but got 1") {
panic("Expected proposal sequence 2 but got 1")
}
return nil
})).Sugar()
nodes[0].Setup()

startNodes(nodes, network)

nodes[0].Submit(Request{ID: "1", ClientID: "alice"})

restartWG.Wait()
nodes[0].RestartSync(false)
restoredWG.Wait()

nodes[0].Submit(Request{ID: "2", ClientID: "alice"})

data := make([]*AppRecord, 0)
for i := 0; i < numberOfNodes; i++ {
d := <-nodes[i].Delivered
data = append(data, d)
}
for i := 0; i < numberOfNodes-1; i++ {
assert.Equal(t, data[i], data[i+1])
}

data = make([]*AppRecord, 0)
for i := 0; i < numberOfNodes; i++ {
d := <-nodes[i].Delivered
data = append(data, d)
}
for i := 0; i < numberOfNodes-1; i++ {
assert.Equal(t, data[i], data[i+1])
}
}

func TestGradualStart(t *testing.T) {
// Scenario: initially the network has only one node
// a transaction is submitted and committed with that node
Expand Down

0 comments on commit cd14de6

Please sign in to comment.