Skip to content

Commit

Permalink
BFT Block Puller: BFTDeliverer
Browse files Browse the repository at this point in the history
- A BFTDeliverer that fetches blocks and maintains a BFTCensorshipMonitor.
- Abstract the creation of a BFTCensorshipMonitor via an abstract factory, so that we can use a mock for it in testing.
- Add a "shuffledEndpoints" method to the connection source and test it.
- Unit testing of BFTDeliverer.

Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: Ifead3f9e6c803c4d9fabc63acce11c6da472b88d
  • Loading branch information
tock-ibm committed Aug 27, 2023
1 parent 9e69515 commit 3fa1fff
Show file tree
Hide file tree
Showing 17 changed files with 1,784 additions and 243 deletions.
8 changes: 4 additions & 4 deletions internal/pkg/peer/blocksprovider/bft_censorship_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DeliverClientRequester interface {
// We track the progress of header receivers against the block reception progress.
// If there is a header that is ahead of the last block, and a timeout had passed since that header was received, we
// declare that censorship was detected.
// When censorship is detected, errCensorship is sent to the errorCh which can be read by ErrorsChannel() method.
// When censorship is detected, ErrCensorship is sent to the errorCh which can be read by ErrorsChannel() method.
type BFTCensorshipMonitor struct {
chainID string
headerVerifier BlockVerifier
Expand Down Expand Up @@ -163,19 +163,19 @@ func (m *BFTCensorshipMonitor) Monitor() {
for {
if err := m.launchHeaderReceivers(); err != nil {
m.logger.Warningf("Failure while launching header receivers: %s", err)
m.errorCh <- &errFatal{message: err.Error()}
m.errorCh <- &ErrFatal{Message: err.Error()}
m.Stop()
return
}

select {
case <-m.stopCh:
m.errorCh <- &errStopping{message: "received a stop signal"}
m.errorCh <- &ErrStopping{Message: "received a stop signal"}
return
case <-time.After(m.timeoutConfig.BlockCensorshipTimeout / 100):
if m.detectBlockCensorship() {
m.logger.Warningf("Block censorship detected, block source endpoint: %s", m.fetchSources[m.blockSourceIndex])
m.errorCh <- &errCensorship{message: fmt.Sprintf("block censorship detected, endpoint: %s", m.fetchSources[m.blockSourceIndex])}
m.errorCh <- &ErrCensorship{Message: fmt.Sprintf("block censorship detected, endpoint: %s", m.fetchSources[m.blockSourceIndex])}
m.Stop()
return
}
Expand Down
25 changes: 25 additions & 0 deletions internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package blocksprovider

import "github.com/hyperledger/fabric/internal/pkg/peer/orderers"

// BFTCensorshipMonitorFactory creates an instance of a BFTCensorshipMonitor. It is an implementation of the
// CensorshipDetectorFactory interface which abstracts the creation of a BFTCensorshipMonitor.
type BFTCensorshipMonitorFactory struct{}

func (f *BFTCensorshipMonitorFactory) Create(
chainID string,
verifier BlockVerifier,
requester DeliverClientRequester,
progressReporter BlockProgressReporter,
fetchSources []*orderers.Endpoint,
blockSourceIndex int,
timeoutConf TimeoutConfig,
) CensorshipDetector {
return NewBFTCensorshipMonitor(chainID, verifier, requester, progressReporter, fetchSources, blockSourceIndex, timeoutConf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package blocksprovider_test

import (
"testing"

"github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider"
"github.com/stretchr/testify/require"
)

func TestNewBFTCensorshipMonitorFactory(t *testing.T) {
s := newMonitorTestSetup(t, 5)
f := &blocksprovider.BFTCensorshipMonitorFactory{}
mon := f.Create(s.channelID, s.fakeBlockVerifier, s.fakeRequester, s.fakeProgressReporter, s.sources, 0, blocksprovider.TimeoutConfig{})
require.NotNil(t, mon)
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestBFTCensorshipMonitor_NoHeadersNoBlocks(t *testing.T) {

require.Eventually(t, func() bool { return s.fakeRequester.SeekInfoHeadersFromCallCount() == 3 }, 5*time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool { return s.fakeRequester.ConnectCallCount() == 3 }, 5*time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool { return s.fakeProgressReporter.BlockProgressCallCount() == 9 }, 5*time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool { return s.fakeProgressReporter.BlockProgressCallCount() >= 9 }, 5*time.Second, 10*time.Millisecond)
for i := 0; i < s.fakeRequester.ConnectCallCount(); i++ {
n := s.fakeRequester.SeekInfoHeadersFromArgsForCall(i)
require.Equal(t, uint64(0), n)
Expand Down
Loading

0 comments on commit 3fa1fff

Please sign in to comment.