Skip to content

Commit

Permalink
Fix bug stuck in propose (#561)
Browse files Browse the repository at this point in the history
The leader after a restart would grab the leader token and wait for a non-empty batch.
In the new test the view thread of the leader after the restart is just about to call controller decide.
However, before the provided fix, the controller is busy waiting for a new batch.
Without any new transactions, the leader is stuck.
In the fix, if the new batch is empty, the controller continues while reacquiring the leader token, thus allowing the view to call decide.

Signed-off-by: Фёдор Партанский <pfi79@mail.ru>
Co-authored-by: Hagar Meir <hagar.meir@ibm.com>
  • Loading branch information
pfi79 and HagarMeir authored Aug 26, 2023
1 parent 071648f commit 2a0557b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 17 deletions.
23 changes: 6 additions & 17 deletions internal/bft/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,24 +468,13 @@ func (c *Controller) ViewChanged(newViewNumber uint64, newProposalSequence uint6
c.viewChange <- viewInfo{proposalSeq: newProposalSequence, viewNumber: newViewNumber}
}

func (c *Controller) getNextBatch() [][]byte {
var validRequests [][]byte
for len(validRequests) == 0 { // no valid requests in this batch
requests := c.Batcher.NextBatch()
if c.stopped() || c.Batcher.Closed() {
return nil
}
validRequests = append(validRequests, requests...)
}
return validRequests
}

func (c *Controller) propose() {
nextBatch := c.getNextBatch()
if len(nextBatch) == 0 {
// If our next batch is empty,
// it can only be because
// the batcher is stopped and so are we.
if c.stopped() || c.Batcher.Closed() {
return
}
nextBatch := c.Batcher.NextBatch()
if len(nextBatch) == 0 { // no requests in this batch
c.acquireLeaderToken() // try again later
return
}
metadata := c.currView.GetMetadata()
Expand Down
1 change: 1 addition & 0 deletions internal/bft/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ func TestControllerLeaderRequestHandling(t *testing.T) {

batcher := &mocks.Batcher{}
batcher.On("Close")
batcher.On("Closed").Return(false)
batcher.On("Reset")
batcher.On("NextBatch").Run(func(arguments mock.Arguments) {
time.Sleep(time.Hour)
Expand Down
64 changes: 64 additions & 0 deletions test/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,70 @@ func TestLeaderModifiesPreprepare(t *testing.T) {
}
}

func TestLeaderCatchUpWithoutSync(t *testing.T) {
t.Parallel()
network := NewNetwork()
defer network.Shutdown()

testDir, err := os.MkdirTemp("", t.Name())
assert.NoErrorf(t, err, "generate temporary test dir")
defer os.RemoveAll(testDir)

numberOfNodes := 4
nodes := make([]*App, 0)
for i := 1; i <= numberOfNodes; i++ {
n := newNode(uint64(i), network, t.Name(), testDir, false, 0)
n.Consensus.Config.SyncOnStart = false
nodes = append(nodes, n)
}

restartWG := sync.WaitGroup{}
restartWG.Add(1)

restoredWG := sync.WaitGroup{}
restoredWG.Add(1)

baseLogger := nodes[0].logger.Desugar()
nodes[0].logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "Processed prepares for proposal with seq 1") {
restartWG.Done()
}
if strings.Contains(entry.Message, "Restored proposal with sequence 1") {
restoredWG.Done()
}
return nil
})).Sugar()
nodes[0].Setup()

startNodes(nodes, network)

nodes[0].Submit(Request{ID: "1", ClientID: "alice"})

restartWG.Wait()
nodes[0].RestartSync(false)
restoredWG.Wait()

data := make([]*AppRecord, 0)
for i := 0; i < numberOfNodes; i++ {
d := <-nodes[i].Delivered
data = append(data, d)
}
for i := 0; i < numberOfNodes-1; i++ {
assert.Equal(t, data[i], data[i+1])
}

nodes[0].Submit(Request{ID: "2", ClientID: "alice"})

data = make([]*AppRecord, 0)
for i := 0; i < numberOfNodes; i++ {
d := <-nodes[i].Delivered
data = append(data, d)
}
for i := 0; i < numberOfNodes-1; i++ {
assert.Equal(t, data[i], data[i+1])
}
}

func TestGradualStart(t *testing.T) {
// Scenario: initially the network has only one node
// a transaction is submitted and committed with that node
Expand Down
5 changes: 5 additions & 0 deletions test/test_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,15 @@ func (a *App) Sync() types.SyncResponse {

// Restart restarts the node
func (a *App) Restart() {
a.RestartSync(true)
}

func (a *App) RestartSync(sync bool) {
a.Consensus.Stop()
a.Node.Lock()
defer a.Node.Unlock()
a.Setup()
a.Consensus.Config.SyncOnStart = sync
if err := a.Consensus.Start(); err != nil {
a.logger.Panicf("Consensus start returned an error : %v", err)
}
Expand Down

0 comments on commit 2a0557b

Please sign in to comment.