Skip to content

Commit

Permalink
Revert "fix two twice delivery on sync (#560)"
Browse files Browse the repository at this point in the history
This reverts commit 154c866.
  • Loading branch information
pfi79 committed Sep 7, 2023
1 parent fcbfcfa commit 31f018a
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 167 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Build

on:
pull_request:

jobs:

build:
Expand Down
7 changes: 3 additions & 4 deletions internal/bft/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,10 +623,6 @@ func (c *Controller) sync() (viewNum uint64, seq uint64, decisions uint64) {

latestSequence := c.latestSeq()

c.Logger.Debugf("Node %d is setting the checkpoint after sync to view %d and seq %d", c.ID, md.ViewId, md.LatestSequence)
c.Checkpoint.Set(decision.Proposal, decision.Signatures)
c.verificationSequence = uint64(decision.Proposal.VerificationSequence)

if md.ViewId < c.currViewNumber {
c.Logger.Infof("Synchronizer returned with view number %d but the controller is at view number %d", md.ViewId, c.currViewNumber)
response := c.fetchState()
Expand Down Expand Up @@ -683,6 +679,9 @@ func (c *Controller) sync() (viewNum uint64, seq uint64, decisions uint64) {
}
}

c.Logger.Debugf("Node %d is setting the checkpoint after sync to view %d and seq %d", c.ID, md.ViewId, md.LatestSequence)
c.Checkpoint.Set(decision.Proposal, decision.Signatures)
c.verificationSequence = uint64(decision.Proposal.VerificationSequence)
c.Logger.Debugf("Node %d is informing the view changer after sync of view %d and seq %d", c.ID, md.ViewId, md.LatestSequence)
c.ViewChanger.InformNewView(view)
if md.LatestSequence == 0 || newView {
Expand Down
162 changes: 0 additions & 162 deletions internal/bft/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,168 +545,6 @@ func TestSyncPrevView(t *testing.T) {
wal.Close()
}

func TestSyncPrevViewAnd2Twice(t *testing.T) {
basicLog, err := zap.NewDevelopment()
assert.NoError(t, err)
log := basicLog.Sugar()
app := &mocks.ApplicationMock{}
appWG := sync.WaitGroup{}
app.On("Deliver", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
appWG.Done()
}).Return(types.Reconfig{InLatestDecision: false})
batcher := &mocks.Batcher{}
batcher.On("Close")
pool := &mocks.RequestPool{}
pool.On("Close")
pool.On("Prune", mock.Anything)
leaderMon := &mocks.LeaderMonitor{}
leaderMon.On("InjectArtificialHeartbeat", mock.Anything, mock.Anything)
leaderMonWG := sync.WaitGroup{}
leaderMon.On("ChangeRole", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
leaderMonWG.Done()
})
leaderMon.On("Close")
comm := &mocks.CommMock{}
comm.On("SendConsensus", mock.Anything, mock.Anything)
verifier := &mocks.VerifierMock{}
verifier.On("VerifyProposal", mock.Anything, mock.Anything).Return(nil, nil)
verifier.On("VerificationSequence").Return(uint64(1))
verifier.On("VerifyConsenterSig", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
signer := &mocks.SignerMock{}
signer.On("SignProposal", mock.Anything, mock.Anything).Return(&types.Signature{
ID: 4,
Value: []byte{4},
})
fd := &mocks.FailureDetector{}
fd.On("Complain", mock.Anything, mock.Anything)
var numCall int
synchronizer := &mocks.SynchronizerMock{}
synchronizerWG := sync.WaitGroup{}
synchronizer.On("Sync").Run(func(args mock.Arguments) {
numCall++
if numCall == 1 {
synchronizerWG.Done()
}
if numCall == 2 {
appWG.Done()
}
}).Return(types.SyncResponse{Latest: types.Decision{
Proposal: types.Proposal{
Metadata: bft.MarshalOrPanic(&protos.ViewMetadata{
LatestSequence: 1,
ViewId: 0, // previous view number
}),
VerificationSequence: 1,
},
Signatures: nil,
}, Reconfig: types.ReconfigSync{InReplicatedDecisions: false}})

startedWG := sync.WaitGroup{}
startedWG.Add(1)

testDir, err := os.MkdirTemp("", "controller-unittest")
assert.NoErrorf(t, err, "generate temporary test dir")
defer os.RemoveAll(testDir)
wal, err := wal.Create(log, testDir, nil)
assert.NoError(t, err)

collector := bft.StateCollector{
SelfID: 0,
N: 4,
Logger: log,
CollectTimeout: 100 * time.Millisecond,
}
collector.Start()
defer collector.Stop()

controller := &bft.Controller{
Collector: &collector,
InFlight: &bft.InFlightData{},
Batcher: batcher,
RequestPool: pool,
LeaderMonitor: leaderMon,
ID: 4, // not the leader
N: 4,
NodesList: []uint64{1, 2, 3, 4},
Logger: log,
Application: app,
Comm: comm,
ViewChanger: &bft.ViewChanger{},
Checkpoint: &types.Checkpoint{},
FailureDetector: fd,
Synchronizer: synchronizer,
Verifier: verifier,
Signer: signer,
WAL: wal,
StartedWG: &startedWG,
MetricsView: api.NewMetricsView(&disabled.Provider{}),
}
controller.Deliver = &bft.MutuallyExclusiveDeliver{C: controller}

vs := configureProposerBuilder(controller)
controller.ViewSequences = vs

leaderMonWG.Add(1)
controller.Start(1, 1, 0, false)
leaderMonWG.Wait()

synchronizerWG.Add(1)
leaderMonWG.Add(1)
// send a message with view 2 to trigger sync
wrongViewMsg := proto.Clone(prePrepare).(*protos.Message)
wrongViewMsgGet := wrongViewMsg.GetPrePrepare()
wrongViewMsgGet.View = 2
controller.ProcessMessages(2, wrongViewMsg)
// waiting for a synchronization that returned blocks with LatestSequence: 1,
synchronizerWG.Wait()
leaderMonWG.Wait() // wait for view to start before sending messages

prePrepareNext := proto.Clone(prePrepare).(*protos.Message)
prePrepareNextGet := prePrepareNext.GetPrePrepare()
prePrepareNextGet.Seq = 1
prePrepareNextGet.GetProposal().Metadata = bft.MarshalOrPanic(&protos.ViewMetadata{
DecisionsInView: 0,
LatestSequence: 1,
ViewId: 1,
})
controller.ProcessMessages(2, prePrepareNext)

nextProp := types.Proposal{
Header: prePrepareNextGet.Proposal.Header,
Payload: prePrepareNextGet.Proposal.Payload,
Metadata: prePrepareNextGet.Proposal.Metadata,
VerificationSequence: 1,
}
prepareNext := proto.Clone(prepare).(*protos.Message)
prepareNextGet := prepareNext.GetPrepare()
prepareNextGet.Seq = 1
prepareNextGet.Digest = nextProp.Digest()
controller.ProcessMessages(2, prepareNext)
controller.ProcessMessages(3, prepareNext)

commit2Next := proto.Clone(commit2).(*protos.Message)
commit2NextGet := commit2Next.GetCommit()
commit2NextGet.Seq = 1
commit2NextGet.Digest = nextProp.Digest()

commit3Next := proto.Clone(commit3).(*protos.Message)
commit3NextGet := commit3Next.GetCommit()
commit3NextGet.Seq = 1
commit3NextGet.Digest = nextProp.Digest()

appWG.Add(1)
controller.ProcessMessages(2, commit2Next)
controller.ProcessMessages(3, commit3Next)

// send a message with seq 1, but have already received a block with seq 1
// therefore no delivery will occur
appWG.Wait()
app.AssertNumberOfCalls(t, "Deliver", 0)

controller.Stop()
wal.Close()
}

func TestControllerLeaderRequestHandling(t *testing.T) {
for _, testCase := range []struct {
description string
Expand Down

0 comments on commit 31f018a

Please sign in to comment.