diff --git a/core/deliverservice/blocksprovider/blocksprovider.go b/core/deliverservice/blocksprovider/blocksprovider.go index bea8ca3044f..3853b37925b 100644 --- a/core/deliverservice/blocksprovider/blocksprovider.go +++ b/core/deliverservice/blocksprovider/blocksprovider.go @@ -183,7 +183,9 @@ func (b *blocksProviderImpl) DeliverBlocks() { logger.Debugf("[%s] Adding payload locally, buffer seqNum = [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers) // Add payload to local state payloads buffer - b.gossip.AddPayload(b.chainID, payload) + if err := b.gossip.AddPayload(b.chainID, payload); err != nil { + logger.Warning("Failed adding payload of", seqNum, "because:", err) + } // Gossip messages with other nodes logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers) diff --git a/gossip/state/state.go b/gossip/state/state.go index 1c7436c4aba..24647b8639a 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -19,6 +19,7 @@ package state import ( "bytes" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -55,6 +56,8 @@ const ( defChannelBufferSize = 100 defAntiEntropyMaxRetries = 3 + + defMaxBlockDistance = 100 ) // GossipAdapter defines gossip/communication required interface for state provider @@ -404,10 +407,11 @@ func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) { dataMsg := msg.GetDataMsg() if dataMsg != nil { - // Add new payload to ordered set - + if err := s.AddPayload(dataMsg.GetPayload()); err != nil { + logger.Warning("Failed adding payload:", err) + return + } logger.Debugf("Received new payload with sequence number = [%d]", dataMsg.Payload.SeqNum) - s.payloads.Push(dataMsg.GetPayload()) } else { logger.Debug("Gossip message received is not of data message type, usually this should not happen.") } @@ -616,8 +620,19 @@ func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block { // AddPayload add new payload into state func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error { - + if payload == nil { + return errors.New("Given payload is nil") + } logger.Debug("Adding new payload into the buffer, seqNum = ", payload.SeqNum) + height, err := s.committer.LedgerHeight() + if err != nil { + return fmt.Errorf("Failed obtaining ledger height: %v", err) + } + + if payload.SeqNum-height >= defMaxBlockDistance { + return fmt.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum) + } + return s.payloads.Push(payload) } diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index fc4eeefbe4e..804dff63149 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -167,6 +167,7 @@ func (node *peerNode) shutdown() { type mockCommitter struct { mock.Mock + sync.Mutex } func (mc *mockCommitter) Commit(block *pcomm.Block) error { @@ -175,6 +176,8 @@ func (mc *mockCommitter) Commit(block *pcomm.Block) error { } func (mc *mockCommitter) LedgerHeight() (uint64, error) { + mc.Lock() + defer mc.Unlock() if mc.Called().Get(1) == nil { return mc.Called().Get(0).(uint64), nil } @@ -277,6 +280,110 @@ func TestNilDirectMsg(t *testing.T) { p.s.(*GossipStateProviderImpl).directMessage(req) } +func TestNilAddPayload(t *testing.T) { + mc := &mockCommitter{} + mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil) + g := &mocks.GossipMock{} + g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil) + g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage)) + p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g) + defer p.shutdown() + err := p.s.AddPayload(nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "nil") +} + +func TestAddPayloadLedgerUnavailable(t *testing.T) { + mc := &mockCommitter{} + mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil) + g := &mocks.GossipMock{} + g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil) + g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage)) + p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g) + defer p.shutdown() + // Simulate a problem in the ledger + failedLedger := mock.Mock{} + failedLedger.On("LedgerHeight", mock.Anything).Return(uint64(0), errors.New("cannot query ledger")) + mc.Lock() + mc.Mock = failedLedger + mc.Unlock() + + rawblock := pcomm.NewBlock(uint64(1), []byte{}) + b, _ := pb.Marshal(rawblock) + err := p.s.AddPayload(&proto.Payload{ + SeqNum: uint64(1), + Data: b, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "Failed obtaining ledger height") + assert.Contains(t, err.Error(), "cannot query ledger") +} + +func TestOverPopulation(t *testing.T) { + // Scenario: Add to the state provider blocks + // with a gap in between, and ensure that the payload buffer + // rejects blocks starting if the distance between the ledger height to the latest + // block it contains is bigger than defMaxBlockDistance. + + mc := &mockCommitter{} + blocksPassedToLedger := make(chan uint64, 10) + mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) { + blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number + }) + mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil) + g := &mocks.GossipMock{} + g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil) + g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage)) + p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor) + defer p.shutdown() + + // Add some blocks in a sequential manner and make sure it works + for i := 1; i <= 4; i++ { + rawblock := pcomm.NewBlock(uint64(i), []byte{}) + b, _ := pb.Marshal(rawblock) + assert.NoError(t, p.s.AddPayload(&proto.Payload{ + SeqNum: uint64(i), + Data: b, + })) + } + + // Add payloads from 10 to defMaxBlockDistance, while we're missing blocks [5,9] + // Should succeed + for i := 10; i <= defMaxBlockDistance; i++ { + rawblock := pcomm.NewBlock(uint64(i), []byte{}) + b, _ := pb.Marshal(rawblock) + assert.NoError(t, p.s.AddPayload(&proto.Payload{ + SeqNum: uint64(i), + Data: b, + })) + } + + // Add payloads from defMaxBlockDistance + 2 to defMaxBlockDistance * 10 + // Should fail. + for i := defMaxBlockDistance + 1; i <= defMaxBlockDistance*10; i++ { + rawblock := pcomm.NewBlock(uint64(i), []byte{}) + b, _ := pb.Marshal(rawblock) + assert.Error(t, p.s.AddPayload(&proto.Payload{ + SeqNum: uint64(i), + Data: b, + })) + } + + // Ensure only blocks 1-4 were passed to the ledger + close(blocksPassedToLedger) + i := 1 + for seq := range blocksPassedToLedger { + assert.Equal(t, uint64(i), seq) + i++ + } + assert.Equal(t, 5, i) + + // Ensure we don't store too many blocks in memory + sp := p.s.(*GossipStateProviderImpl) + assert.True(t, sp.payloads.Size() < defMaxBlockDistance) + +} + func TestFailures(t *testing.T) { mc := &mockCommitter{} mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)