Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the test case that often fails #319

Merged
merged 3 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(pv)
Expand Down Expand Up @@ -130,7 +129,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch, c *config2.P2PConfig) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
s.SetLogger(log.NewNopLogger().With("module", "p2p")) // Switch log is noisy for this test
return s
}, p2p.Connect2Switches)

Expand Down Expand Up @@ -164,6 +163,17 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}

//
// Remove a lazy proposer:
// Cannot accept the below codes in `VerifyAggregatedSignature` after `lazyProposer.LastCommit.MakeCommit()`
// `commit.Signatures[len(commit.Signatures)-1] = types.NewCommitSigAbsent()`
//
// If you get fail test result, you find the below messages from each node.
// `prevote step: ProposalBlock is invalid`
// `err="wrong aggregated signature: `
// `failed to verify the aggregated hashes by 1 public keys`
//
/*
// introducing a lazy proposer means that the time of the block committed is different to the
// timestamp that the other nodes have. This tests to ensure that the evidence that finally gets
// proposed will have a valid timestamp
Expand Down Expand Up @@ -235,6 +245,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}

*/

// start the consensus reactors
for i := 0; i < nValidators; i++ {
s := reactors[i].conS.GetState()
Expand Down Expand Up @@ -280,7 +292,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
assert.Equal(t, prevoteHeight, ev.Height())
}
}
case <-time.After(30 * time.Second): // XXX 20 second is short time, so we changed to 30 second
case <-time.After(10 * time.Second): // XXX 20 second is too much time, so we changed to 10 second
for i, reactor := range reactors {
t.Logf("Consensus Reactor %d\n%v", i, reactor)
}
Expand Down
47 changes: 43 additions & 4 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ func newStateWithConfigAndBlockStore(
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
) *State {
return newStateWithConfigAndBlockStoreWithLoggers(thisConfig, state, pv, app, blockDB, DefaultTestLoggers())
}

func newStateWithConfigAndBlockStoreWithLoggers(
thisConfig *cfg.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
loggers TestLoggers,
) *State {
// Get BlockStore
blockStore := store.NewBlockStore(blockDB)
Expand All @@ -415,7 +426,7 @@ func newStateWithConfigAndBlockStore(

// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
mempool.SetLogger(loggers.memLogger.With("module", "mempool"))
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
Expand All @@ -429,13 +440,13 @@ func newStateWithConfigAndBlockStore(
panic(err)
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, loggers.execLogger, proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetLogger(loggers.csLogger.With("module", "consensus"))
cs.SetPrivValidator(pv)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.SetLogger(loggers.eventLogger.With("module", "events"))
err := eventBus.Start()
if err != nil {
panic(err)
Expand Down Expand Up @@ -761,6 +772,34 @@ func ensureNewEventOnChannel(ch <-chan tmpubsub.Message) {
//-------------------------------------------------------------------------------
// consensus nets

type TestLoggers struct {
memLogger log.Logger
evLogger log.Logger
execLogger log.Logger
csLogger log.Logger
eventLogger log.Logger
}

func NewTestLoggers(memLogger, evLogger, execLogger, csLogger, eventLogger log.Logger) TestLoggers {
return TestLoggers{
memLogger: memLogger,
evLogger: evLogger,
execLogger: execLogger,
csLogger: csLogger,
eventLogger: eventLogger,
}
}

func DefaultTestLoggers() TestLoggers {
return NewTestLoggers(
log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger())
}

func NopTestLoggers() TestLoggers {
return NewTestLoggers(
log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger())
}

// consensusLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func consensusLogger() log.Logger {
Expand Down
68 changes: 48 additions & 20 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,33 @@ func TestMain(m *testing.M) {
// and which ones we need the wal for - then we'd also be able to only flush the
// wal writer when we need to, instead of with every message.

func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger()
func startNewStateAndWaitForBlock(t *testing.T, i int, consensusReplayConfig *cfg.Config,
blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger().With("attr", "make block", "i", i)
state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
cs := newStateWithConfigAndBlockStoreWithLoggers(
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockDB,
NewTestLoggers(
log.NewNopLogger().With("module", "mempool"),
log.NewNopLogger().With("module", "evidence"),
logger.With("module", "executor"),
logger.With("module", "consensus"),
log.NewNopLogger().With("module", "event")),
)
cs.SetLogger(logger)

bytes, _ := ioutil.ReadFile(cs.config.WalFile())
t.Logf("====== WAL: \n\r%X\n", bytes)

err := cs.Start()
require.NoError(t, err)
defer func() {
if err := cs.Stop(); err != nil {
t.Error(err)
}
// Wait for closing WAL after writing remains messages to WAL
cs.Wait()
}()

// This is just a signal that we haven't halted; its not something contained
Expand All @@ -98,7 +102,9 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-newBlockSub.Out():
case msg := <-newBlockSub.Out():
height := msg.Data().(types.EventDataNewBlock).Block.Height
t.Logf("Make Block.Height[%d]", height)
case <-newBlockSub.Cancelled():
t.Fatal("newBlockSub was cancelled")
case <-time.After(10 * time.Second): // XXX 120 second is too much time, so we changed to 10 second
Expand Down Expand Up @@ -155,29 +161,34 @@ func TestWALCrash(t *testing.T) {
func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config,
initFn func(dbm.DB, *State, context.Context), heightToStop int64) {
walPanicked := make(chan error)
crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop}
crashingWal := &crashingWAL{t: t, panicCh: walPanicked, heightToStop: heightToStop}

i := 1
LOOP:
for {
t.Logf("====== LOOP %d\n", i)

// create consensus state from a clean slate
logger := log.NewNopLogger()
blockDB := memdb.NewDB()
stateDB := blockDB
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
require.NoError(t, err)
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
logger := log.TestingLogger().With("attr", "crash wal", "i", i)
cs := newStateWithConfigAndBlockStoreWithLoggers(
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockDB,
NewTestLoggers(
log.NewNopLogger().With("module", "mempool"),
log.NewNopLogger().With("module", "evidence"),
logger.With("module", "executor"),
logger.With("module", "consensus"),
log.NewNopLogger().With("module", "event")),
)
cs.SetLogger(logger)

// start sending transactions
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -200,18 +211,18 @@ LOOP:
err = cs.Start()
require.NoError(t, err)

i++

select {
case err := <-walPanicked:
t.Logf("WAL panicked: %v", err)

// make sure we can make blocks after a crash
startNewStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateStore)

// stop consensus state and transactions sender (initFn)
cs.Stop() //nolint:errcheck // Logging this error causes failure
cancel()
// For safety since nobody stops and writing WAL continue sometimes.
cs.wal.Stop()

// make sure we can make blocks after a crash
startNewStateAndWaitForBlock(t, i, consensusReplayConfig, blockDB, stateStore)

// if we reached the required height, exit
if _, ok := err.(ReachedHeightToStopError); ok {
Expand All @@ -220,13 +231,16 @@ LOOP:
case <-time.After(10 * time.Second):
t.Fatal("WAL did not panic for 10 seconds (check the log)")
}

i++
}
}

// crashingWAL is a WAL which crashes or rather simulates a crash during Save
// (before and after). It remembers a message for which we last panicked
// (lastPanickedForMsgIndex), so we don't panic for it in subsequent iterations.
type crashingWAL struct {
t *testing.T
next WAL
panicCh chan error
heightToStop int64
Expand Down Expand Up @@ -260,15 +274,29 @@ func (e ReachedHeightToStopError) Error() string {
// exiting the cs.receiveRoutine.
func (w *crashingWAL) Write(m WALMessage) error {
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop {
if endMsg.Height >= w.heightToStop {
w.t.Logf("Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height)
w.panicCh <- ReachedHeightToStopError{endMsg.Height}
runtime.Goexit()
return nil
}

w.t.Logf("Not-Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height)
w.msgIndex++
return w.next.Write(m)
}

if mi, ok := m.(msgInfo); ok {
if pm, ok := mi.Msg.(*ProposalMessage); ok {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, pm.Proposal.Type)
} else if vm, ok := mi.Msg.(*VoteMessage); ok {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, vm.Vote.Type)
} else {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]", w.msgIndex, m, mi.Msg)
}
} else {
w.t.Logf("Skipped[%d] WAL message[%T]", w.msgIndex, m)
}

if w.msgIndex > w.lastPanickedForMsgIndex {
w.lastPanickedForMsgIndex = w.msgIndex
_, file, line, _ := runtime.Caller(1)
Expand Down
12 changes: 10 additions & 2 deletions p2p/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func CreateRandomPeer(outbound bool) Peer {
mconn: &conn.MConnection{},
metrics: NopMetrics(),
}
p.SetLogger(log.TestingLogger().With("peer", addr))
if p.Logger == nil {
p.SetLogger(log.TestingLogger().With("peer", addr))
} else {
p.SetLogger(p.Logger.With("peer", addr))
}
return p
}

Expand Down Expand Up @@ -200,7 +204,11 @@ func MakeSwitch(

// TODO: let the config be passed in?
sw := initSwitch(i, NewSwitch(cfg, t, opts...), cfg) // receive buffer size is all 1000 in test
sw.SetLogger(log.TestingLogger().With("switch", i))
if sw.Logger == nil {
sw.SetLogger(log.TestingLogger().With("switch", i))
} else {
sw.SetLogger(sw.Logger.With("switch", i))
}
sw.SetNodeKey(&nodeKey)

ni := nodeInfo.(DefaultNodeInfo)
Expand Down