From 3fa1fff7816265e0e09d9145c2a1d4d3f061e902 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Tue, 8 Aug 2023 11:43:57 +0300 Subject: [PATCH] BFT Block Puller: BFTDeliverer - A BFTDeliverer that fetches blocks and maintains a BFTCensorshipMonitor. - Abstract the creation of a BFTCensorshipMonitor via an abstract factory, so that we can use a mock for it in testing. - Add a "shuffledEndpoints" method to the connection source and test it. - Unit testing of BFTDeliverer. Signed-off-by: Yoav Tock Change-Id: Ifead3f9e6c803c4d9fabc63acce11c6da472b88d --- .../blocksprovider/bft_censorship_monitor.go | 8 +- .../bft_censorship_monitor_factory.go | 25 + .../bft_censorship_monitor_factory_test.go | 21 + .../bft_censorship_monitor_test.go | 2 +- .../pkg/peer/blocksprovider/bft_deliverer.go | 236 +++- .../peer/blocksprovider/bft_deliverer_test.go | 1115 +++++++++++++++-- .../pkg/peer/blocksprovider/block_receiver.go | 4 +- internal/pkg/peer/blocksprovider/deliverer.go | 4 +- .../pkg/peer/blocksprovider/deliverer_test.go | 2 +- .../fake/censorship_detector.go | 162 +++ .../fake/censorship_detector_factory.go | 129 ++ .../fake/duration_exceeded_handler.go | 102 ++ .../fake/orderer_connection_source.go | 126 +- internal/pkg/peer/blocksprovider/util.go | 38 +- internal/pkg/peer/blocksprovider/util_test.go | 4 + internal/pkg/peer/orderers/connection.go | 15 + internal/pkg/peer/orderers/connection_test.go | 34 + 17 files changed, 1784 insertions(+), 243 deletions(-) create mode 100644 internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory.go create mode 100644 internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory_test.go create mode 100644 internal/pkg/peer/blocksprovider/fake/censorship_detector.go create mode 100644 internal/pkg/peer/blocksprovider/fake/censorship_detector_factory.go create mode 100644 internal/pkg/peer/blocksprovider/fake/duration_exceeded_handler.go diff --git a/internal/pkg/peer/blocksprovider/bft_censorship_monitor.go b/internal/pkg/peer/blocksprovider/bft_censorship_monitor.go index ab2cf8be9fa..3ad302fbd21 100644 --- a/internal/pkg/peer/blocksprovider/bft_censorship_monitor.go +++ b/internal/pkg/peer/blocksprovider/bft_censorship_monitor.go @@ -43,7 +43,7 @@ type DeliverClientRequester interface { // We track the progress of header receivers against the block reception progress. // If there is a header that is ahead of the last block, and a timeout had passed since that header was received, we // declare that censorship was detected. -// When censorship is detected, errCensorship is sent to the errorCh which can be read by ErrorsChannel() method. +// When censorship is detected, ErrCensorship is sent to the errorCh which can be read by ErrorsChannel() method. type BFTCensorshipMonitor struct { chainID string headerVerifier BlockVerifier @@ -163,19 +163,19 @@ func (m *BFTCensorshipMonitor) Monitor() { for { if err := m.launchHeaderReceivers(); err != nil { m.logger.Warningf("Failure while launching header receivers: %s", err) - m.errorCh <- &errFatal{message: err.Error()} + m.errorCh <- &ErrFatal{Message: err.Error()} m.Stop() return } select { case <-m.stopCh: - m.errorCh <- &errStopping{message: "received a stop signal"} + m.errorCh <- &ErrStopping{Message: "received a stop signal"} return case <-time.After(m.timeoutConfig.BlockCensorshipTimeout / 100): if m.detectBlockCensorship() { m.logger.Warningf("Block censorship detected, block source endpoint: %s", m.fetchSources[m.blockSourceIndex]) - m.errorCh <- &errCensorship{message: fmt.Sprintf("block censorship detected, endpoint: %s", m.fetchSources[m.blockSourceIndex])} + m.errorCh <- &ErrCensorship{Message: fmt.Sprintf("block censorship detected, endpoint: %s", m.fetchSources[m.blockSourceIndex])} m.Stop() return } diff --git a/internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory.go b/internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory.go new file mode 100644 index 00000000000..b8762587651 --- /dev/null +++ b/internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory.go @@ -0,0 +1,25 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package blocksprovider + +import "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + +// BFTCensorshipMonitorFactory creates an instance of a BFTCensorshipMonitor. It is an implementation of the +// CensorshipDetectorFactory interface which abstracts the creation of a BFTCensorshipMonitor. +type BFTCensorshipMonitorFactory struct{} + +func (f *BFTCensorshipMonitorFactory) Create( + chainID string, + verifier BlockVerifier, + requester DeliverClientRequester, + progressReporter BlockProgressReporter, + fetchSources []*orderers.Endpoint, + blockSourceIndex int, + timeoutConf TimeoutConfig, +) CensorshipDetector { + return NewBFTCensorshipMonitor(chainID, verifier, requester, progressReporter, fetchSources, blockSourceIndex, timeoutConf) +} diff --git a/internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory_test.go b/internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory_test.go new file mode 100644 index 00000000000..5f5a247e88d --- /dev/null +++ b/internal/pkg/peer/blocksprovider/bft_censorship_monitor_factory_test.go @@ -0,0 +1,21 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package blocksprovider_test + +import ( + "testing" + + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/stretchr/testify/require" +) + +func TestNewBFTCensorshipMonitorFactory(t *testing.T) { + s := newMonitorTestSetup(t, 5) + f := &blocksprovider.BFTCensorshipMonitorFactory{} + mon := f.Create(s.channelID, s.fakeBlockVerifier, s.fakeRequester, s.fakeProgressReporter, s.sources, 0, blocksprovider.TimeoutConfig{}) + require.NotNil(t, mon) +} diff --git a/internal/pkg/peer/blocksprovider/bft_censorship_monitor_test.go b/internal/pkg/peer/blocksprovider/bft_censorship_monitor_test.go index 829aaaf9ad1..5b08b2090eb 100644 --- a/internal/pkg/peer/blocksprovider/bft_censorship_monitor_test.go +++ b/internal/pkg/peer/blocksprovider/bft_censorship_monitor_test.go @@ -171,7 +171,7 @@ func TestBFTCensorshipMonitor_NoHeadersNoBlocks(t *testing.T) { require.Eventually(t, func() bool { return s.fakeRequester.SeekInfoHeadersFromCallCount() == 3 }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { return s.fakeRequester.ConnectCallCount() == 3 }, 5*time.Second, 10*time.Millisecond) - require.Eventually(t, func() bool { return s.fakeProgressReporter.BlockProgressCallCount() == 9 }, 5*time.Second, 10*time.Millisecond) + require.Eventually(t, func() bool { return s.fakeProgressReporter.BlockProgressCallCount() >= 9 }, 5*time.Second, 10*time.Millisecond) for i := 0; i < s.fakeRequester.ConnectCallCount(); i++ { n := s.fakeRequester.SeekInfoHeadersFromArgsForCall(i) require.Equal(t, uint64(0), n) diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer.go b/internal/pkg/peer/blocksprovider/bft_deliverer.go index faba681550e..f328019dd45 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer.go @@ -18,23 +18,52 @@ import ( "github.com/pkg/errors" ) +//go:generate counterfeiter -o fake/censorship_detector.go --fake-name CensorshipDetector . CensorshipDetector +type CensorshipDetector interface { + Monitor() + Stop() + ErrorsChannel() <-chan error +} + +//go:generate counterfeiter -o fake/censorship_detector_factory.go --fake-name CensorshipDetectorFactory . CensorshipDetectorFactory +type CensorshipDetectorFactory interface { + Create( + chainID string, + verifier BlockVerifier, + requester DeliverClientRequester, + progressReporter BlockProgressReporter, + fetchSources []*orderers.Endpoint, + blockSourceIndex int, + timeoutConf TimeoutConfig, + ) CensorshipDetector +} + +//go:generate counterfeiter -o fake/duration_exceeded_handler.go --fake-name DurationExceededHandler . DurationExceededHandler +type DurationExceededHandler interface { + DurationExceededHandler() (stopRetries bool) +} + // BFTDeliverer TODO this is a skeleton type BFTDeliverer struct { // TODO - ChannelID string - BlockHandler BlockHandler - Ledger LedgerInfo - BlockVerifier BlockVerifier - Dialer Dialer - Orderers OrdererConnectionSource - DoneC chan struct{} - Signer identity.SignerSerializer - DeliverStreamer DeliverStreamer - Logger *flogging.FabricLogger + ChannelID string + BlockHandler BlockHandler + Ledger LedgerInfo + BlockVerifier BlockVerifier + Dialer Dialer + Orderers OrdererConnectionSource + DoneC chan struct{} + Signer identity.SignerSerializer + DeliverStreamer DeliverStreamer + CensorshipDetectorFactory CensorshipDetectorFactory + Logger *flogging.FabricLogger - // The maximal value of the actual retry interval, which cannot increase beyond this value - MaxRetryInterval time.Duration // The initial value of the actual retry interval, which is increased on every failed retry InitialRetryInterval time.Duration + // The maximal value of the actual retry interval, which cannot increase beyond this value + MaxRetryInterval time.Duration + // If a certain header from a header receiver is in front of the block receiver for more that this time, a + // censorship event is declared and the block source is changed. + BlockCensorshipTimeout time.Duration // After this duration, the MaxRetryDurationExceededHandler is called to decide whether to keep trying MaxRetryDuration time.Duration // This function is called after MaxRetryDuration of failed retries to decide whether to keep trying @@ -47,22 +76,20 @@ type BFTDeliverer struct { // TODO requester *DeliveryRequester - mutex sync.Mutex // mutex protects the following fields - stopFlag bool // mark the Deliverer as stopped - nextBlockNumber uint64 // next block number - lastBlockTime time.Time // last block time - lastBlockSourceIndex int // the source index of the last block we got, or -1 - fetchFailureCounter int // counts the number of consecutive failures to fetch a block + mutex sync.Mutex // mutex protects the following fields + stopFlag bool // mark the Deliverer as stopped + nextBlockNumber uint64 // next block number + lastBlockTime time.Time // last block time + lastBlockSourceIndex int // the source index of the last block we got, or -1 + fetchFailureCounter int // counts the number of consecutive failures to fetch a block + fetchFailureTotalSleepDuration time.Duration // the cumulative sleep time from when fetchFailureCounter goes 0->1 fetchSources []*orderers.Endpoint fetchSourceIndex int fetchErrorsC chan error - blockReceiver *BlockReceiver - - // TODO here we'll have a CensorshipMonitor component that detects block censorship. - // When it suspects censorship, it will emit an error to this channel. - monitorErrorsC chan error + blockReceiver *BlockReceiver + censorshipMonitor CensorshipDetector } func (d *BFTDeliverer) Initialize() { @@ -75,6 +102,17 @@ func (d *BFTDeliverer) Initialize() { ) } +func (d *BFTDeliverer) BlockProgress() (uint64, time.Time) { + d.mutex.Lock() + defer d.mutex.Unlock() + + if d.nextBlockNumber == 0 { + return 0, time.Time{} + } + + return d.nextBlockNumber - 1, d.lastBlockTime +} + func (d *BFTDeliverer) DeliverBlocks() { var err error @@ -85,9 +123,19 @@ func (d *BFTDeliverer) DeliverBlocks() { d.Logger.Error("Did not return ledger height, something is critically wrong", err) return } - d.Logger.Infof("Starting DeliverBlocks on channel `%s`, block height=%d", d.ChannelID, d.nextBlockNumber) - // select an initial source randomly + d.Logger.Infof("Starting to DeliverBlocks on channel `%s`, block height=%d", d.ChannelID, d.nextBlockNumber) + defer func() { + d.Logger.Infof("Stopping to DeliverBlocks on channel `%s`, block height=%d", d.ChannelID, d.nextBlockNumber) + }() + + timeoutConfig := TimeoutConfig{ + MinRetryInterval: d.InitialRetryInterval, + MaxRetryInterval: d.MaxRetryInterval, + BlockCensorshipTimeout: d.BlockCensorshipTimeout, + } + + // Refresh and randomize the sources, selects a random initial source, and incurs a random iteration order. d.refreshSources() FetchAndMonitorLoop: @@ -95,66 +143,103 @@ FetchAndMonitorLoop: // The backoff duration is doubled with every failed round. // A failed round is when we had moved through all the endpoints without success. // If we get a block successfully from a source, or endpoints are refreshed, the failure count is reset. - if count := d.getFetchFailureCounter(); count > 0 { - rounds := uint(count) + failureCounter, failureTotalSleepDuration := d.getFetchFailureStats() + if failureCounter > 0 { + rounds := uint(failureCounter) if l := len(d.fetchSources); l > 0 { - rounds = uint(count / len(d.fetchSources)) + rounds = uint(failureCounter / l) + } + + if failureTotalSleepDuration > d.MaxRetryDuration { + if d.MaxRetryDurationExceededHandler() { + d.Logger.Warningf("Attempted to retry block delivery for more than MaxRetryDuration (%s), giving up", d.MaxRetryDuration) + break FetchAndMonitorLoop + } + d.Logger.Debugf("Attempted to retry block delivery for more than MaxRetryDuration (%s), but handler decided to continue retrying", d.MaxRetryDuration) } - dur := backOffDuration(2.0, rounds, BftMinRetryInterval, BftMaxRetryInterval) + dur := backOffDuration(2.0, rounds, d.InitialRetryInterval, d.MaxRetryInterval) + d.Logger.Warningf("Failed to fetch blocks, count=%d, round=%d, going to retry in %s", failureCounter, rounds, dur) d.sleeper.Sleep(dur, d.DoneC) + d.addFetchFailureSleepDuration(dur) } - // assign other endpoints to the monitor - + // No endpoints is a non-recoverable error, as new endpoints are a result of fetching new blocks from an orderer. + if len(d.fetchSources) == 0 { + d.Logger.Error("Failure in DeliverBlocks, no orderer endpoints, something is critically wrong") + break FetchAndMonitorLoop + } // start a block fetcher and a monitor // a buffered channel so that the fetcher goroutine can send an error and exit w/o waiting for it to be consumed. d.fetchErrorsC = make(chan error, 1) source := d.fetchSources[d.fetchSourceIndex] go d.FetchBlocks(source) - // TODO start a censorship monitor + // create and start a censorship monitor + d.censorshipMonitor = d.CensorshipDetectorFactory.Create( + d.ChannelID, d.BlockVerifier, d.requester, d, d.fetchSources, d.fetchSourceIndex, timeoutConfig) + go d.censorshipMonitor.Monitor() // wait for fetch errors, censorship suspicions, or a stop signal. select { case <-d.DoneC: break FetchAndMonitorLoop case errFetch := <-d.fetchErrorsC: - if errFetch != nil { - switch errFetch.(type) { - case *errStopping: - // nothing to do - case *errRefreshEndpoint: - // get new endpoints and reassign fetcher and monitor - d.refreshSources() - d.resetFetchFailureCounter() - default: - d.fetchSourceIndex = (d.fetchSourceIndex + 1) % len(d.fetchSources) - d.incFetchFailureCounter() - } - // TODO can it be nil? + d.Logger.Debugf("Error received from fetchErrorsC channel: %s", errFetch) + + switch errFetch.(type) { + case *ErrStopping: + // nothing to do + break FetchAndMonitorLoop + case *errRefreshEndpoint: + // get new endpoints and reassign fetcher and monitor + d.refreshSources() + d.resetFetchFailureCounter() + case *ErrFatal: + d.Logger.Errorf("Failure in FetchBlocks, something is critically wrong: %s", errFetch) + break FetchAndMonitorLoop + default: + d.fetchSourceIndex = (d.fetchSourceIndex + 1) % len(d.fetchSources) + d.incFetchFailureCounter() } - case errMonitor := <-d.monitorErrorsC: - // TODO until we implement the censorship monitor this nil channel is blocked - if errMonitor != nil { - d.Logger.Warningf("Censorship suspicion: %s", err) - // TODO close the block receiver, increment the index - // TODO + case errMonitor := <-d.censorshipMonitor.ErrorsChannel(): + d.Logger.Debugf("Error received from censorshipMonitor.ErrorsChannel: %s", errMonitor) + + switch errMonitor.(type) { + case *ErrStopping: + // nothing to do + break FetchAndMonitorLoop + case *ErrCensorship: + d.Logger.Warningf("Censorship suspicion: %s", errMonitor) + d.mutex.Lock() + d.blockReceiver.Stop() + d.mutex.Unlock() + d.fetchSourceIndex = (d.fetchSourceIndex + 1) % len(d.fetchSources) + d.incFetchFailureCounter() + case *ErrFatal: + d.Logger.Errorf("Failure in CensorshipMonitor, something is critically wrong: %s", errMonitor) + break FetchAndMonitorLoop + default: + d.Logger.Errorf("Unexpected error from CensorshipMonitor, something is critically wrong: %s", errMonitor) + break FetchAndMonitorLoop } - // TODO can it be nil? } + + d.censorshipMonitor.Stop() } // clean up everything because we are closing d.mutex.Lock() defer d.mutex.Unlock() d.blockReceiver.Stop() - // TODO stop the monitor + if d.censorshipMonitor != nil { + d.censorshipMonitor.Stop() + } } func (d *BFTDeliverer) refreshSources() { // select an initial source randomly - d.fetchSources = shuffle(d.Orderers.Endpoints()) + d.fetchSources = d.Orderers.ShuffledEndpoints() d.Logger.Infof("Refreshed endpoints: %s", d.fetchSources) d.fetchSourceIndex = 0 } @@ -179,7 +264,7 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) { for { select { case <-d.DoneC: - d.fetchErrorsC <- &errStopping{message: "stopping"} + d.fetchErrorsC <- &ErrStopping{Message: "stopping"} return default: } @@ -187,7 +272,7 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) { seekInfoEnv, err := d.requester.SeekInfoBlocksFrom(d.getNextBlockNumber()) if err != nil { d.Logger.Errorf("Could not create a signed Deliver SeekInfo message, something is critically wrong: %s", err) - d.fetchErrorsC <- &errFatal{message: fmt.Sprintf("could not create a signed Deliver SeekInfo message: %s", err)} + d.fetchErrorsC <- &ErrFatal{Message: fmt.Sprintf("could not create a signed Deliver SeekInfo message: %s", err)} return } @@ -207,29 +292,46 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) { recvC: make(chan *orderer.DeliverResponse), stopC: make(chan struct{}), endpoint: source, - logger: d.Logger.With("orderer-address", source.Address), + logger: flogging.MustGetLogger("BlockReceiver").With("orderer-address", source.Address), } d.mutex.Lock() d.blockReceiver = blockRcv d.mutex.Unlock() + // Starts a goroutine that receives blocks from the stream client and places them in the `recvC` channel blockRcv.Start() - if err := blockRcv.ProcessIncoming(d.onBlockProcessingSuccess); err != nil { - d.Logger.Warningf("failure while processing incoming blocks: %s", err) - d.fetchErrorsC <- errors.Wrapf(err, "failure while processing incoming blocks, orderer-address: %s", source.Address) + // Consume blocks fom the `recvC` channel + if errProc := blockRcv.ProcessIncoming(d.onBlockProcessingSuccess); errProc != nil { + switch errProc.(type) { + case *ErrStopping: + // nothing to do + d.Logger.Debugf("BlockReceiver stopped while processing incoming blocks: %s", errProc) + case *errRefreshEndpoint: + d.Logger.Infof("Endpoint refreshed while processing incoming blocks: %s", errProc) + d.fetchErrorsC <- errProc + default: + d.Logger.Warningf("Failure while processing incoming blocks: %s", errProc) + d.fetchErrorsC <- errProc + } + return } } } func (d *BFTDeliverer) onBlockProcessingSuccess(blockNum uint64) { + d.Logger.Debugf("blockNum: %d", blockNum) + d.mutex.Lock() + defer d.mutex.Unlock() + d.fetchFailureCounter = 0 + d.fetchFailureTotalSleepDuration = 0 + d.nextBlockNumber = blockNum + 1 d.lastBlockTime = time.Now() - d.mutex.Unlock() } func (d *BFTDeliverer) resetFetchFailureCounter() { @@ -237,13 +339,14 @@ func (d *BFTDeliverer) resetFetchFailureCounter() { defer d.mutex.Unlock() d.fetchFailureCounter = 0 + d.fetchFailureTotalSleepDuration = 0 } -func (d *BFTDeliverer) getFetchFailureCounter() int { +func (d *BFTDeliverer) getFetchFailureStats() (int, time.Duration) { d.mutex.Lock() defer d.mutex.Unlock() - return d.fetchFailureCounter + return d.fetchFailureCounter, d.fetchFailureTotalSleepDuration } func (d *BFTDeliverer) incFetchFailureCounter() { @@ -253,6 +356,13 @@ func (d *BFTDeliverer) incFetchFailureCounter() { d.fetchFailureCounter++ } +func (d *BFTDeliverer) addFetchFailureSleepDuration(dur time.Duration) { + d.mutex.Lock() + defer d.mutex.Unlock() + + d.fetchFailureTotalSleepDuration += dur +} + func (d *BFTDeliverer) getNextBlockNumber() uint64 { d.mutex.Lock() defer d.mutex.Unlock() diff --git a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go index 9c9370f3d4e..770a20ad35f 100644 --- a/internal/pkg/peer/blocksprovider/bft_deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/bft_deliverer_test.go @@ -7,31 +7,35 @@ SPDX-License-Identifier: Apache-2.0 package blocksprovider_test import ( + "bytes" "fmt" + "math" "sync" "testing" "time" + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider/fake" "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/protoutil" . "github.com/onsi/gomega" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" ) type bftDelivererTestSetup struct { - withT *WithT - d *blocksprovider.BFTDeliverer - - mutex sync.Mutex // protects the following fields - ccs []*grpc.ClientConn + gWithT *WithT + d *blocksprovider.BFTDeliverer fakeDialer *fake.Dialer - fakeGossipServiceAdapter *fake.GossipServiceAdapter fakeBlockHandler *fake.BlockHandler fakeOrdererConnectionSource *fake.OrdererConnectionSource fakeLedgerInfo *fake.LedgerInfo @@ -39,17 +43,28 @@ type bftDelivererTestSetup struct { fakeSigner *fake.Signer fakeDeliverStreamer *fake.DeliverStreamer fakeDeliverClient *fake.DeliverClient + fakeCensorshipMonFactory *fake.CensorshipDetectorFactory + fakeCensorshipMon *fake.CensorshipDetector fakeSleeper *fake.Sleeper - doneC chan struct{} - recvStep chan struct{} - endC chan struct{} + fakeDurationExceededHandler *fake.DurationExceededHandler + + deliverClientDoneC chan struct{} // signals the deliverClient to exit + recvStepC chan *orderer.DeliverResponse + endC chan struct{} + + mutex sync.Mutex // protects the following fields + clientConnSet []*grpc.ClientConn // client connection set + monitorSet []*fake.CensorshipDetector // monitor set + monEndCSet []chan struct{} // monitor end set + monErrC chan error // the monitor errors channel, where it emits (fake) censorship events + monDoneC chan struct{} // signal the monitor to stop + monEndC chan struct{} // when the monitor stops, it closes this channel } func newBFTDelivererTestSetup(t *testing.T) *bftDelivererTestSetup { s := &bftDelivererTestSetup{ - withT: NewWithT(t), + gWithT: NewWithT(t), fakeDialer: &fake.Dialer{}, - fakeGossipServiceAdapter: &fake.GossipServiceAdapter{}, fakeBlockHandler: &fake.BlockHandler{}, fakeOrdererConnectionSource: &fake.OrdererConnectionSource{}, fakeLedgerInfo: &fake.LedgerInfo{}, @@ -57,23 +72,27 @@ func newBFTDelivererTestSetup(t *testing.T) *bftDelivererTestSetup { fakeSigner: &fake.Signer{}, fakeDeliverStreamer: &fake.DeliverStreamer{}, fakeDeliverClient: &fake.DeliverClient{}, + fakeCensorshipMonFactory: &fake.CensorshipDetectorFactory{}, fakeSleeper: &fake.Sleeper{}, - doneC: make(chan struct{}), - recvStep: make(chan struct{}), + fakeDurationExceededHandler: &fake.DurationExceededHandler{}, + deliverClientDoneC: make(chan struct{}), + recvStepC: make(chan *orderer.DeliverResponse), endC: make(chan struct{}), } return s } -func (s *bftDelivererTestSetup) beforeEach() { +func (s *bftDelivererTestSetup) initialize(t *testing.T) { s.fakeDialer.DialStub = func(string, [][]byte) (*grpc.ClientConn, error) { s.mutex.Lock() defer s.mutex.Unlock() + cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) - s.ccs = append(s.ccs, cc) - s.withT.Expect(err).NotTo(HaveOccurred()) - s.withT.Expect(cc.GetState()).NotTo(Equal(connectivity.Shutdown)) + s.clientConnSet = append(s.clientConnSet, cc) + require.NoError(t, err) + require.NotEqual(t, connectivity.Shutdown, cc.GetState()) + return cc, nil } @@ -81,63 +100,120 @@ func (s *bftDelivererTestSetup) beforeEach() { s.fakeOrdererConnectionSource.RandomEndpointReturns(&orderers.Endpoint{ Address: "orderer-address-1", }, nil) - s.fakeOrdererConnectionSource.EndpointsReturns( - []*orderers.Endpoint{ - { - Address: "orderer-address-1", - RootCerts: nil, - Refreshed: make(chan struct{}), - }, - { - Address: "orderer-address-2", - RootCerts: nil, - Refreshed: make(chan struct{}), - }, - { - Address: "orderer-address-3", - RootCerts: nil, - Refreshed: make(chan struct{}), - }, - { - Address: "orderer-address-4", - RootCerts: nil, - Refreshed: make(chan struct{}), - }, - }) + sources := []*orderers.Endpoint{ + { + Address: "orderer-address-1", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-2", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-3", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-4", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + } + s.fakeOrdererConnectionSource.ShuffledEndpointsReturns(sources) + + s.fakeSigner.SignReturns([]byte("good-sig"), nil) + s.fakeDeliverClient.RecvStub = func() (*orderer.DeliverResponse, error) { select { - case <-s.recvStep: - return nil, fmt.Errorf("fake-recv-step-error") - case <-s.doneC: + case r := <-s.recvStepC: + if r == nil { + return nil, fmt.Errorf("fake-recv-step-error") + } + return r, nil + case <-s.deliverClientDoneC: return nil, nil } } s.fakeDeliverClient.CloseSendStub = func() error { select { - case s.recvStep <- struct{}{}: - case <-s.doneC: + case s.recvStepC <- nil: + case <-s.deliverClientDoneC: } + return nil } s.fakeDeliverStreamer.DeliverReturns(s.fakeDeliverClient, nil) + // Censorship monitor creation. + // The monitor can be created multiple times during a test. + // The monitor allows to send error events to the BFTDeliverer, be stopped, and block the monitor goroutine. + s.fakeCensorshipMonFactory.CreateCalls( + func( + chID string, + verifier blocksprovider.BlockVerifier, + requester blocksprovider.DeliverClientRequester, + reporter blocksprovider.BlockProgressReporter, + endpoints []*orderers.Endpoint, + index int, + config blocksprovider.TimeoutConfig, + ) blocksprovider.CensorshipDetector { + monErrC := make(chan error, 1) + monDoneC := make(chan struct{}) + monEndC := make(chan struct{}) + + mon := &fake.CensorshipDetector{} + mon.ErrorsChannelCalls(func() <-chan error { + return monErrC + }) + mon.MonitorCalls(func() { + <-monDoneC + close(monEndC) + }, + ) + mon.StopCalls(func() { + select { + case <-monDoneC: + default: + close(monDoneC) + } + }) + + s.mutex.Lock() + defer s.mutex.Unlock() + + s.fakeCensorshipMon = mon + s.monitorSet = append(s.monitorSet, s.fakeCensorshipMon) + s.monEndCSet = append(s.monEndCSet, monEndC) + s.monErrC = monErrC + s.monDoneC = monDoneC + s.monEndC = monEndC + + return mon + }) + s.d = &blocksprovider.BFTDeliverer{ - ChannelID: "channel-id", - BlockHandler: s.fakeBlockHandler, - Ledger: s.fakeLedgerInfo, - BlockVerifier: s.fakeBlockVerifier, - Dialer: s.fakeDialer, - Orderers: s.fakeOrdererConnectionSource, - DoneC: make(chan struct{}), - Signer: s.fakeSigner, - DeliverStreamer: s.fakeDeliverStreamer, - Logger: flogging.MustGetLogger("blocksprovider"), - TLSCertHash: []byte("tls-cert-hash"), - MaxRetryDuration: time.Hour, - MaxRetryInterval: 10 * time.Second, - InitialRetryInterval: 100 * time.Millisecond, + ChannelID: "channel-id", + BlockHandler: s.fakeBlockHandler, + Ledger: s.fakeLedgerInfo, + BlockVerifier: s.fakeBlockVerifier, + Dialer: s.fakeDialer, + Orderers: s.fakeOrdererConnectionSource, + DoneC: make(chan struct{}), + Signer: s.fakeSigner, + DeliverStreamer: s.fakeDeliverStreamer, + CensorshipDetectorFactory: s.fakeCensorshipMonFactory, + Logger: flogging.MustGetLogger("BFTDeliverer.test"), + TLSCertHash: []byte("tls-cert-hash"), + MaxRetryInterval: 10 * time.Second, + InitialRetryInterval: 100 * time.Millisecond, + BlockCensorshipTimeout: 20 * time.Second, + MaxRetryDuration: 600 * time.Second, + MaxRetryDurationExceededHandler: s.fakeDurationExceededHandler.DurationExceededHandler, } s.d.Initialize() @@ -146,55 +222,932 @@ func (s *bftDelivererTestSetup) beforeEach() { blocksprovider.SetSleeper(s.d, s.fakeSleeper) } -func (s *bftDelivererTestSetup) justBeforeEach() { +func (s *bftDelivererTestSetup) start() { go func() { s.d.DeliverBlocks() close(s.endC) }() } -func (s *bftDelivererTestSetup) afterEach() { +func (s *bftDelivererTestSetup) stop() { s.d.Stop() - close(s.doneC) + + select { + case <-s.deliverClientDoneC: + default: + close(s.deliverClientDoneC) + } + <-s.endC } -func TestBFTDeliverer(t *testing.T) { - t.Run("waits patiently for new blocks from the orderer", func(t *testing.T) { +func (s *bftDelivererTestSetup) assertEventuallyMonitorCallCount(n int) { + s.gWithT.Eventually( + func() int { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.fakeCensorshipMon.MonitorCallCount() + }).Should(Equal(n)) +} + +func TestBFTDeliverer_NoBlocks(t *testing.T) { + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + startTime := time.Now() + setup.start() + + t.Log("Checks the ledger height") + require.Eventually(t, func() bool { + return setup.fakeLedgerInfo.LedgerHeightCallCount() == 1 + }, time.Second, 10*time.Millisecond) + + t.Log("Get the endpoints") + setup.gWithT.Eventually(setup.fakeOrdererConnectionSource.ShuffledEndpointsCallCount).Should(Equal(1)) + + t.Log("Signs the seek request") + setup.gWithT.Eventually(setup.fakeSigner.SignCallCount).Should(Equal(1)) + + t.Log("Seeks the correct block") + setup.gWithT.Eventually(setup.fakeDeliverClient.SendCallCount).Should(Equal(1)) + env := setup.fakeDeliverClient.SendArgsForCall(0) + require.True(t, bytes.Equal(env.GetSignature(), []byte("good-sig"))) + payload, err := protoutil.UnmarshalPayload(env.GetPayload()) + require.NoError(t, err) + seekInfo := &orderer.SeekInfo{} + err = proto.Unmarshal(payload.Data, seekInfo) + require.NoError(t, err) + require.Equal(t, uint64(7), seekInfo.GetStart().GetSpecified().GetNumber()) + + t.Log("Creates and starts the monitor") + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(1)) + setup.assertEventuallyMonitorCallCount(1) + + t.Log("Dials to an orderer from the shuffled endpoints") + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(1)) + addr, tlsCerts := setup.fakeDialer.DialArgsForCall(0) + require.Equal(t, "orderer-address-1", addr) + require.Nil(t, tlsCerts) // TODO add tests that verify this + + t.Log("waits patiently for new blocks from the orderer") + require.Condition(t, func() (success bool) { + select { + case <-setup.endC: + return false + case <-setup.monEndC: + return false + case <-time.After(100 * time.Millisecond): + return true + } + }, "channels wrongly closed") + + t.Log("block progress is reported correctly") + bNum, bTime := setup.d.BlockProgress() + require.Equal(t, uint64(6), bNum) + require.True(t, bTime.After(startTime)) + + t.Log("client connection is active") + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + require.NotEqual(t, connectivity.Shutdown, setup.clientConnSet[0].GetState(), + "client connection unexpectedly shut down") + }() + + setup.stop() +} + +func TestBFTDeliverer_FatalErrors(t *testing.T) { + t.Run("Ledger height returns an error", func(t *testing.T) { setup := newBFTDelivererTestSetup(t) - setup.beforeEach() - setup.justBeforeEach() + setup.initialize(t) + setup.fakeLedgerInfo.LedgerHeightReturns(0, fmt.Errorf("fake-ledger-error")) + setup.start() + + t.Log("Exits the DeliverBlocks loop") + setup.gWithT.Eventually(setup.endC).Should(BeClosed()) + require.Equal(t, 0, setup.fakeCensorshipMonFactory.CreateCallCount(), "monitor was not created") + + setup.stop() + }) + + t.Run("Fails to sign seek request", func(t *testing.T) { + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + setup.fakeSigner.SignReturns(nil, fmt.Errorf("fake-ledger-error")) + setup.start() + + t.Log("Starts the DeliverBlocks and Monitor loop") + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(1)) + setup.assertEventuallyMonitorCallCount(1) + + t.Log("Exits the DeliverBlocks and Monitor loop") + setup.gWithT.Eventually(setup.endC).Should(BeClosed()) + setup.gWithT.Eventually(setup.monEndC).Should(BeClosed()) + + setup.stop() + }) + + t.Run("No endpoints", func(t *testing.T) { + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + setup.fakeOrdererConnectionSource.ShuffledEndpointsReturns(nil) + setup.start() + + t.Log("Starts the DeliverBlocks and Monitor loop") + setup.gWithT.Eventually(setup.fakeOrdererConnectionSource.ShuffledEndpointsCallCount).Should(Equal(1)) + t.Log("Exits the DeliverBlocks loop") + setup.gWithT.Eventually(setup.endC).Should(BeClosed()) + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(0)) + require.Nil(t, setup.fakeCensorshipMon) + + setup.stop() + }) +} + +func TestBFTDeliverer_DialRetries(t *testing.T) { + t.Run("Dial returns error, then succeeds", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + setup.fakeDialer.DialReturnsOnCall(0, nil, fmt.Errorf("fake-dial-error")) + cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + setup.gWithT.Expect(err).NotTo(HaveOccurred()) + setup.fakeDialer.DialReturnsOnCall(1, cc, nil) + + setup.start() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(2)) + + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(2)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(1)) + setup.gWithT.Expect(setup.fakeSleeper.SleepArgsForCall(0)).To(Equal(100 * time.Millisecond)) + + setup.stop() + + setup.mutex.Lock() + defer setup.mutex.Unlock() + require.Len(t, setup.monitorSet, 2) + for i, mon := range setup.monitorSet { + require.Equal(t, 1, mon.MonitorCallCount()) + require.Equal(t, 1, mon.StopCallCount()) + <-setup.monEndCSet[i] + } + }) + + t.Run("Dial returns several consecutive errors, exponential backoff, then succeeds", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + // 6 rounds + for i := 0; i < 24; i++ { + setup.fakeDialer.DialReturnsOnCall(i, nil, fmt.Errorf("fake-dial-error")) + } - setup.withT.Consistently(setup.endC).ShouldNot(BeClosed()) + cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + setup.gWithT.Expect(err).NotTo(HaveOccurred()) + setup.fakeDialer.DialReturnsOnCall(24, cc, nil) + + setup.start() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(25)) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(25)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(24)) + + t.Log("Exponential backoff after every round") + minDur := 100 * time.Millisecond + for i := 0; i < 24; i++ { + round := (i + 1) / 4 + fDur := math.Min(float64(minDur.Nanoseconds())*math.Pow(2.0, float64(round)), float64(10*time.Second)) + dur := time.Duration(fDur) + assert.Equal(t, dur, setup.fakeSleeper.SleepArgsForCall(i), fmt.Sprintf("i=%d", i)) + } + + setup.stop() + + setup.mutex.Lock() + defer setup.mutex.Unlock() + require.Len(t, setup.monitorSet, 25) + for i, mon := range setup.monitorSet { + require.Equal(t, 1, mon.MonitorCallCount()) + require.Equal(t, 1, mon.StopCallCount()) + <-setup.monEndCSet[i] + } + + t.Log("Cycles through all sources") + addresses := make(map[string]bool) + addr1, _ := setup.fakeDialer.DialArgsForCall(0) + for i := 1; i < setup.fakeDialer.DialCallCount(); i++ { + addr2, _ := setup.fakeDialer.DialArgsForCall(i) + require.NotEqual(t, addr1, addr2) + addresses[addr1] = true + addr1 = addr2 + } + require.Len(t, addresses, 4) + }) + + t.Run("Dial returns repeated consecutive errors, exponential backoff saturates", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + setup.fakeDialer.DialReturns(nil, fmt.Errorf("fake-dial-error")) + + setup.start() + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(BeNumerically(">=", 100)) + t.Log("Calls the handler but does not stop") + setup.gWithT.Eventually(setup.fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(BeNumerically(">", 5)) + setup.gWithT.Consistently(setup.endC).ShouldNot(BeClosed()) + setup.stop() + + t.Log("Exponential backoff after every round, with saturation of 10s") + minDur := 100 * time.Millisecond + for i := 0; i < setup.fakeSleeper.SleepCallCount(); i++ { + round := (i + 1) / 4 + fDur := math.Min(float64(minDur.Nanoseconds())*math.Pow(2.0, float64(round)), float64(10*time.Second)) + dur := time.Duration(fDur) + assert.Equal(t, dur, setup.fakeSleeper.SleepArgsForCall(i), fmt.Sprintf("i=%d", i)) + } + + var monSet []*fake.CensorshipDetector func() { setup.mutex.Lock() defer setup.mutex.Unlock() - setup.withT.Expect(setup.ccs[0].GetState()).NotTo(Equal(connectivity.Shutdown)) + monSet = setup.monitorSet }() - setup.afterEach() + for i, mon := range monSet { + <-setup.monEndCSet[i] + require.Equal(t, 1, mon.MonitorCallCount(), fmt.Sprintf("i=%d", i)) + require.Equal(t, 1, mon.StopCallCount(), fmt.Sprintf("i=%d", i)) + } }) - t.Run("checks the ledger height", func(t *testing.T) { + t.Run("Dial returns repeated consecutive errors, total sleep larger than MaxRetryDuration", func(t *testing.T) { + flogging.ActivateSpec("debug") setup := newBFTDelivererTestSetup(t) - setup.beforeEach() - setup.justBeforeEach() + setup.initialize(t) - setup.withT.Eventually(setup.fakeLedgerInfo.LedgerHeightCallCount).Should(Equal(1)) + setup.fakeDurationExceededHandler.DurationExceededHandlerReturns(true) - setup.afterEach() + setup.fakeDialer.DialReturns(nil, fmt.Errorf("fake-dial-error")) + + setup.start() + t.Log("Calls handler and stops") + setup.gWithT.Eventually(setup.fakeDurationExceededHandler.DurationExceededHandlerCallCount).Should(Equal(1)) + setup.gWithT.Eventually(setup.endC).Should(BeClosed()) + + t.Log("Exponential backoff after every round, with saturation of 10s") + minDur := 100 * time.Millisecond + totalDur := time.Duration(0) + for i := 0; i < setup.fakeSleeper.SleepCallCount(); i++ { + round := (i + 1) / 4 + fDur := math.Min(float64(minDur.Nanoseconds())*math.Pow(2.0, float64(round)), float64(10*time.Second)) + dur := time.Duration(fDur) + assert.Equal(t, dur, setup.fakeSleeper.SleepArgsForCall(i), fmt.Sprintf("i=%d", i)) + totalDur += dur + } + + require.True(t, totalDur > setup.d.MaxRetryDuration) + require.Equal(t, 82, setup.fakeSleeper.SleepCallCount()) + + var monSet []*fake.CensorshipDetector + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + monSet = setup.monitorSet + }() + + for i, mon := range monSet { + <-setup.monEndCSet[i] + require.Equal(t, 1, mon.MonitorCallCount(), fmt.Sprintf("i=%d", i)) + if i == 82 { + require.Equal(t, 2, mon.StopCallCount(), fmt.Sprintf("i=%d", i)) + } else { + require.Equal(t, 1, mon.StopCallCount(), fmt.Sprintf("i=%d", i)) + } + } }) +} - t.Run("when the ledger returns an error", func(t *testing.T) { +func TestBFTDeliverer_DeliverRetries(t *testing.T) { + t.Run("Deliver returns error, then succeeds", func(t *testing.T) { + flogging.ActivateSpec("debug") setup := newBFTDelivererTestSetup(t) - setup.beforeEach() - setup.fakeLedgerInfo.LedgerHeightReturns(0, fmt.Errorf("fake-ledger-error")) - setup.justBeforeEach() + setup.initialize(t) + + setup.fakeDeliverStreamer.DeliverReturnsOnCall(0, nil, fmt.Errorf("deliver-error")) + setup.fakeDeliverStreamer.DeliverReturnsOnCall(1, setup.fakeDeliverClient, nil) + + setup.start() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(2)) + + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(2)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(1)) + setup.gWithT.Expect(setup.fakeSleeper.SleepArgsForCall(0)).To(Equal(100 * time.Millisecond)) + + setup.stop() - setup.withT.Eventually(setup.endC).Should(BeClosed()) + setup.mutex.Lock() + defer setup.mutex.Unlock() + require.Len(t, setup.monitorSet, 2) + for i, mon := range setup.monitorSet { + require.Equal(t, 1, mon.MonitorCallCount()) + require.Equal(t, 1, mon.StopCallCount()) + <-setup.monEndCSet[i] + } + }) + + t.Run("Deliver returns several consecutive errors, exponential backoff, then succeeds", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + // 6 rounds + for i := 0; i < 24; i++ { + setup.fakeDeliverStreamer.DeliverReturnsOnCall(i, nil, fmt.Errorf("deliver-error")) + } + setup.fakeDeliverStreamer.DeliverReturnsOnCall(24, setup.fakeDeliverClient, nil) + + setup.start() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(25)) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(25)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(24)) + + t.Log("Exponential backoff after every round") + minDur := 100 * time.Millisecond + for i := 0; i < 24; i++ { + round := (i + 1) / 4 + fDur := math.Min(float64(minDur.Nanoseconds())*math.Pow(2.0, float64(round)), float64(10*time.Second)) + dur := time.Duration(fDur) + assert.Equal(t, dur, setup.fakeSleeper.SleepArgsForCall(i), fmt.Sprintf("i=%d", i)) + } - setup.afterEach() + setup.stop() + + setup.mutex.Lock() + defer setup.mutex.Unlock() + require.Len(t, setup.monitorSet, 25) + for i, mon := range setup.monitorSet { + require.Equal(t, 1, mon.MonitorCallCount()) + require.Equal(t, 1, mon.StopCallCount()) + <-setup.monEndCSet[i] + } + + t.Log("Cycles through all sources") + addresses := make(map[string]bool) + addr1, _ := setup.fakeDialer.DialArgsForCall(0) + for i := 1; i < setup.fakeDialer.DialCallCount(); i++ { + addr2, _ := setup.fakeDialer.DialArgsForCall(i) + require.NotEqual(t, addr1, addr2) + addresses[addr1] = true + addr1 = addr2 + } + require.Len(t, addresses, 4) }) - // TODO more tests + t.Run("Deliver returns repeated consecutive errors, exponential backoff saturates", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + setup.fakeDeliverStreamer.DeliverReturns(nil, fmt.Errorf("deliver-error")) + + setup.start() + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(BeNumerically(">=", 40)) + setup.stop() + + t.Log("Exponential backoff after every round, with saturation of 10s") + minDur := 100 * time.Millisecond + for i := 0; i < setup.fakeSleeper.SleepCallCount(); i++ { + round := (i + 1) / 4 + fDur := math.Min(float64(minDur.Nanoseconds())*math.Pow(2.0, float64(round)), float64(10*time.Second)) + dur := time.Duration(fDur) + assert.Equal(t, dur, setup.fakeSleeper.SleepArgsForCall(i), fmt.Sprintf("i=%d", i)) + } + + var monSet []*fake.CensorshipDetector + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + monSet = setup.monitorSet + }() + + for i, mon := range monSet { + <-setup.monEndCSet[i] + require.Equal(t, 1, mon.MonitorCallCount(), fmt.Sprintf("i=%d", i)) + require.Equal(t, 1, mon.StopCallCount(), fmt.Sprintf("i=%d", i)) + } + }) +} + +func TestBFTDeliverer_BlockReception(t *testing.T) { + t.Run("Block is valid", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + startTime := time.Now() + + t.Log("block progress is reported correctly before start") + bNum, bTime := setup.d.BlockProgress() + require.Equal(t, uint64(0), bNum) + require.True(t, bTime.IsZero()) + + setup.start() + + setup.gWithT.Eventually(setup.fakeLedgerInfo.LedgerHeightCallCount).Should(Equal(1)) + bNum, bTime = setup.d.BlockProgress() + require.Equal(t, uint64(6), bNum) + require.True(t, bTime.After(startTime)) + + t.Log("Recv() returns a single block, num: 7") + setup.recvStepC <- &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{Header: &common.BlockHeader{Number: 7}}, + }, + } + + t.Log("receives the block and loops, not sleeping") + setup.gWithT.Eventually(setup.fakeDeliverClient.RecvCallCount).Should(Equal(2)) + require.Equal(t, 0, setup.fakeSleeper.SleepCallCount()) + + t.Log("checks the validity of the block") + setup.gWithT.Eventually(setup.fakeBlockVerifier.VerifyBlockCallCount).Should(Equal(1)) + channelID, blockNum, block := setup.fakeBlockVerifier.VerifyBlockArgsForCall(0) + require.Equal(t, "channel-id", channelID.String()) + require.Equal(t, uint64(7), blockNum) + require.True(t, proto.Equal(block, &common.Block{Header: &common.BlockHeader{Number: 7}})) + + t.Log("handle the block") + setup.gWithT.Eventually(setup.fakeBlockHandler.HandleBlockCallCount).Should(Equal(1)) + channelName, block2 := setup.fakeBlockHandler.HandleBlockArgsForCall(0) + require.Equal(t, "channel-id", channelName) + require.True(t, proto.Equal(block2, &common.Block{Header: &common.BlockHeader{Number: 7}})) + + t.Log("block progress is reported correctly") + bNum2, bTime2 := setup.d.BlockProgress() + require.Equal(t, uint64(7), bNum2) + require.True(t, bTime2.After(bTime)) + + setup.stop() + }) + + t.Run("Block is invalid", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + t.Log("block verification fails") + setup.fakeBlockVerifier.VerifyBlockReturns(fmt.Errorf("fake-verify-error")) + + startTime := time.Now() + setup.start() + + t.Log("Recv() returns a single block, num: 7") + setup.recvStepC <- &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{Header: &common.BlockHeader{Number: 7}}, + }, + } + + t.Log("disconnects, sleeps, and tries again") + setup.gWithT.Eventually(setup.fakeBlockVerifier.VerifyBlockCallCount).Should(Equal(1)) + setup.gWithT.Eventually(setup.fakeSleeper.SleepCallCount).Should(Equal(1)) + require.Equal(t, 1, setup.fakeDeliverClient.CloseSendCallCount()) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(2)) + addr1, _ := setup.fakeDialer.DialArgsForCall(0) + addr2, _ := setup.fakeDialer.DialArgsForCall(1) + require.NotEqual(t, addr1, addr2) + + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + require.Len(t, setup.clientConnSet, 2) + require.Len(t, setup.monitorSet, 2) + }() + + t.Log("does not handle the block") + require.Equal(t, 0, setup.fakeBlockHandler.HandleBlockCallCount()) + + t.Log("block progress is reported correctly") + bNum, bTime := setup.d.BlockProgress() + require.Equal(t, uint64(6), bNum) + require.True(t, bTime.After(startTime)) + + setup.stop() + }) + + t.Run("Block handling fails", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + t.Log("block verification fails") + setup.fakeBlockHandler.HandleBlockReturns(fmt.Errorf("block-handling-error")) + + startTime := time.Now() + setup.start() + + t.Log("Recv() returns a single block, num: 7") + setup.recvStepC <- &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{Header: &common.BlockHeader{Number: 7}}, + }, + } + + t.Log("disconnects, sleeps, and tries again") + setup.gWithT.Eventually(setup.fakeBlockVerifier.VerifyBlockCallCount).Should(Equal(1)) + setup.gWithT.Eventually(setup.fakeSleeper.SleepCallCount).Should(Equal(1)) + require.Equal(t, 1, setup.fakeDeliverClient.CloseSendCallCount()) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(2)) + + addr1, _ := setup.fakeDialer.DialArgsForCall(0) + addr2, _ := setup.fakeDialer.DialArgsForCall(1) + require.NotEqual(t, addr1, addr2) + + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + require.Len(t, setup.clientConnSet, 2) + require.Len(t, setup.monitorSet, 2) + }() + + t.Log("handle the block") + require.Equal(t, 1, setup.fakeBlockHandler.HandleBlockCallCount()) + + t.Log("block progress is reported correctly") + bNum, bTime := setup.d.BlockProgress() + require.Equal(t, uint64(6), bNum) + require.True(t, bTime.After(startTime) || bTime.Equal(startTime)) + + setup.stop() + }) + + t.Run("Block reception resets failure counter", func(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + // 6 failed rounds, creates exponential backoff + for i := 0; i < 24; i++ { + setup.fakeDialer.DialReturnsOnCall(i, nil, fmt.Errorf("fake-dial-error")) + } + // success + cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + setup.gWithT.Expect(err).NotTo(HaveOccurred()) + require.NotNil(t, cc) + setup.fakeDialer.DialReturns(cc, nil) + + startTime := time.Now() + setup.start() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(25)) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(25)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(24)) + + t.Log("Recv() returns a single block, num: 7") + setup.recvStepC <- &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{Header: &common.BlockHeader{Number: 7}}, + }, + } + + t.Log("receives the block and loops, not sleeping") + setup.gWithT.Eventually(setup.fakeDeliverClient.RecvCallCount).Should(Equal(2)) + require.Equal(t, 24, setup.fakeSleeper.SleepCallCount()) + + t.Log("checks the validity of the block") + setup.gWithT.Eventually(setup.fakeBlockVerifier.VerifyBlockCallCount).Should(Equal(1)) + channelID, blockNum, block := setup.fakeBlockVerifier.VerifyBlockArgsForCall(0) + require.Equal(t, "channel-id", channelID.String()) + require.Equal(t, uint64(7), blockNum) + require.True(t, proto.Equal(block, &common.Block{Header: &common.BlockHeader{Number: 7}})) + + t.Log("handle the block") + setup.gWithT.Eventually(setup.fakeBlockHandler.HandleBlockCallCount).Should(Equal(1)) + channelName, block2 := setup.fakeBlockHandler.HandleBlockArgsForCall(0) + require.Equal(t, "channel-id", channelName) + require.True(t, proto.Equal(block2, &common.Block{Header: &common.BlockHeader{Number: 7}})) + + t.Log("block progress is reported correctly") + bNum, bTime := setup.d.BlockProgress() + require.Equal(t, uint64(7), bNum) + require.True(t, bTime.After(startTime)) + + setup.gWithT.Expect(setup.fakeDialer.DialCallCount()).Should(Equal(25)) + + t.Log("a Recv() error occurs") + setup.fakeDeliverClient.CloseSendStub = nil + setup.recvStepC <- nil + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(26)) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(26)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(25)) + + t.Log("failure count was reset, sleep duration returned to minimum") + require.Equal(t, 6400*time.Millisecond, setup.fakeSleeper.SleepArgsForCall(23)) + require.Equal(t, 100*time.Millisecond, setup.fakeSleeper.SleepArgsForCall(24)) + + setup.stop() + }) + + t.Run("Block reception resets total sleep time", func(t *testing.T) { // TODO + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + setup.fakeDurationExceededHandler.DurationExceededHandlerReturns(true) + + // 20 failed rounds, no enough to exceed MaxRetryDuration (it takes 81 calls to go over 10m) + for i := 0; i < 80; i++ { + setup.fakeDialer.DialReturnsOnCall(i, nil, fmt.Errorf("fake-dial-error")) + } + + // another 20 failed rounds, together, it is enough to exceed MaxRetryDuration + for i := 81; i < 160; i++ { + setup.fakeDialer.DialReturnsOnCall(i, nil, fmt.Errorf("fake-dial-error")) + } + + // another 20 failed rounds, together, it is enough to exceed MaxRetryDuration + for i := 161; i < 240; i++ { + setup.fakeDialer.DialReturnsOnCall(i, nil, fmt.Errorf("fake-dial-error")) + } + + // success at attempt 80, 160 and >=240, should reset total sleep time + cc, err := grpc.Dial("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) + setup.gWithT.Expect(err).NotTo(HaveOccurred()) + require.NotNil(t, cc) + setup.fakeDialer.DialReturns(cc, nil) + + setup.start() + + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(81)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(80)) + + t.Log("Recv() returns a single block, num: 7") + setup.recvStepC <- &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{Header: &common.BlockHeader{Number: 7}}, + }, + } + + t.Log("receives the block and loops, not sleeping") + setup.gWithT.Eventually(setup.fakeDeliverClient.RecvCallCount).Should(Equal(2)) + require.Equal(t, 80, setup.fakeSleeper.SleepCallCount()) + + t.Log("a Recv() error occurs, more dial attempts") + setup.fakeDeliverClient.CloseSendStub = nil + setup.recvStepC <- nil + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(161)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(160)) + + t.Log("Recv() returns a single block, num: 8") + setup.recvStepC <- &orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{ + Block: &common.Block{Header: &common.BlockHeader{Number: 8}}, + }, + } + + t.Log("receives the block and loops, not sleeping") + setup.gWithT.Eventually(setup.fakeDeliverClient.RecvCallCount).Should(Equal(4)) + require.Equal(t, 160, setup.fakeSleeper.SleepCallCount()) + + t.Log("a Recv() error occurs, more dial attempts") + setup.fakeDeliverClient.CloseSendStub = nil + setup.recvStepC <- nil + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(241)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(240)) + + t.Log("DurationExceededHandler handler is never called, DeliverBlocks() does not stop") + setup.gWithT.Expect(setup.fakeDurationExceededHandler.DurationExceededHandlerCallCount()).To(Equal(0)) + setup.gWithT.Consistently(setup.endC).ShouldNot(BeClosed()) + + setup.stop() + }) +} + +func TestBFTDeliverer_CensorshipMonitorEvents(t *testing.T) { + for _, errVal := range []error{nil, errors.New("some error"), &blocksprovider.ErrFatal{Message: "some fatal error"}, &blocksprovider.ErrStopping{Message: "stopping"}} { + t.Run("unexpected error or value: "+fmt.Sprintf("%v", errVal), func(t *testing.T) { + flogging.ActivateSpec("debug") + + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + setup.start() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(1)) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(1)) + + // var mon *fake.CensorshipDetector + t.Logf("monitor error channel returns unexpected value: %v", errVal) + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + setup.monErrC <- errVal + }() + + t.Logf("monitor and deliverer exit the loop") + <-setup.endC + <-setup.monEndC + + setup.stop() + }) + } + + t.Run("censorship", func(t *testing.T) { + flogging.ActivateSpec("debug") + + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + setup.start() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(1)) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(1)) + + t.Log("monitor error channel returns censorship error") + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + setup.monErrC <- &blocksprovider.ErrCensorship{Message: "censorship"} + }() + + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(2)) + + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(2)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(1)) + setup.gWithT.Expect(setup.fakeSleeper.SleepArgsForCall(0)).To(Equal(100 * time.Millisecond)) + + setup.stop() + }) + + t.Run("repeated censorship events, with exponential backoff", func(t *testing.T) { + flogging.ActivateSpec("debug") + + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + setup.start() + + for n := 1; n <= 40; n++ { + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(n)) + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(n)) + + t.Logf("monitor error channel returns censorship error num: %d", n) + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + setup.monErrC <- &blocksprovider.ErrCensorship{Message: fmt.Sprintf("censorship %d", n)} + }() + + setup.gWithT.Eventually( + func() int { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + return len(setup.monitorSet) + }).Should(Equal(n + 1)) + + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(n + 1)) + setup.gWithT.Expect(setup.fakeSleeper.SleepCallCount()).To(Equal(n)) + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(n + 1)) + + } + + t.Log("Exponential backoff after every round, with saturation") + minDur := 100 * time.Millisecond + for i := 0; i < 40; i++ { + round := (i + 1) / 4 + dur := time.Duration(minDur.Nanoseconds() * int64(math.Pow(2.0, float64(round)))) + if dur > 10*time.Second { + dur = 10 * time.Second + } + assert.Equal(t, dur, setup.fakeSleeper.SleepArgsForCall(i), fmt.Sprintf("i=%d", i)) + } + + setup.stop() + + setup.mutex.Lock() + defer setup.mutex.Unlock() + require.Len(t, setup.monitorSet, 41) + for i, mon := range setup.monitorSet { + require.Equal(t, 1, mon.MonitorCallCount()) + require.Equal(t, 1, mon.StopCallCount()) + <-setup.monEndCSet[i] + } + + t.Log("Cycles through all sources") + addresses := make(map[string]bool) + addr1, _ := setup.fakeDialer.DialArgsForCall(0) + for i := 1; i < setup.fakeDialer.DialCallCount(); i++ { + addr2, _ := setup.fakeDialer.DialArgsForCall(i) + require.NotEqual(t, addr1, addr2) + addresses[addr1] = true + addr1 = addr2 + } + require.Len(t, addresses, 4) + }) +} + +func TestBFTDeliverer_RefreshEndpoints(t *testing.T) { + flogging.ActivateSpec("debug") + setup := newBFTDelivererTestSetup(t) + setup.initialize(t) + + sources1 := []*orderers.Endpoint{ + { + Address: "orderer-address-1", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-2", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-3", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-4", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + } + sources2 := []*orderers.Endpoint{ + { + Address: "orderer-address-5", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-6", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-7", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + { + Address: "orderer-address-8", + RootCerts: nil, + Refreshed: make(chan struct{}), + }, + } + setup.fakeOrdererConnectionSource.ShuffledEndpointsReturnsOnCall(0, sources1) + setup.fakeOrdererConnectionSource.ShuffledEndpointsReturnsOnCall(1, sources2) + + setup.start() + + t.Log("Get the endpoints") + setup.gWithT.Eventually(setup.fakeOrdererConnectionSource.ShuffledEndpointsCallCount).Should(Equal(1)) + + t.Log("Creates and starts the monitor") + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(1)) + setup.assertEventuallyMonitorCallCount(1) + + t.Log("Dials to an orderer from the shuffled endpoints of the first set") + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(1)) + addr, _ := setup.fakeDialer.DialArgsForCall(0) + require.Equal(t, "orderer-address-1", addr) + + t.Log("Closing the refresh channel (always on all endpoints)") + for _, s := range sources1 { + close(s.Refreshed) + } + + t.Log("Get the endpoints again") + setup.gWithT.Eventually(setup.fakeOrdererConnectionSource.ShuffledEndpointsCallCount).Should(Equal(2)) + + t.Log("Creates and starts the monitor") + setup.gWithT.Eventually(setup.fakeCensorshipMonFactory.CreateCallCount).Should(Equal(2)) + func() { + setup.mutex.Lock() + defer setup.mutex.Unlock() + + setup.gWithT.Eventually(func() int { return len(setup.monitorSet) }).Should(Equal(2)) + setup.gWithT.Eventually(setup.monitorSet[1].MonitorCallCount).Should(Equal(1)) + }() + + t.Log("Dials to an orderer from the shuffled endpoints of the second set") + setup.gWithT.Eventually(setup.fakeDialer.DialCallCount).Should(Equal(2)) + addr, _ = setup.fakeDialer.DialArgsForCall(1) + require.Equal(t, "orderer-address-5", addr) + + t.Log("Does not sleep") + require.Equal(t, 0, setup.fakeSleeper.SleepCallCount()) + + setup.stop() } diff --git a/internal/pkg/peer/blocksprovider/block_receiver.go b/internal/pkg/peer/blocksprovider/block_receiver.go index 197d3a57427..0ffb7dba6d4 100644 --- a/internal/pkg/peer/blocksprovider/block_receiver.go +++ b/internal/pkg/peer/blocksprovider/block_receiver.go @@ -112,12 +112,12 @@ RecvLoop: // Loop until the endpoint is refreshed, or there is an error on the c onSuccess(blockNum) case <-br.stopC: br.logger.Infof("BlockReceiver got a signal to stop") - err = &errStopping{message: "got a signal to stop"} + err = &ErrStopping{Message: "got a signal to stop"} break RecvLoop } } - // cancel the sending side and wait for the start goroutine to exit + // cancel the sending side and wait for the `Start` goroutine to exit br.cancelSendFunc() <-br.recvC diff --git a/internal/pkg/peer/blocksprovider/deliverer.go b/internal/pkg/peer/blocksprovider/deliverer.go index 6c3ca607cff..b9288a8642b 100644 --- a/internal/pkg/peer/blocksprovider/deliverer.go +++ b/internal/pkg/peer/blocksprovider/deliverer.go @@ -56,7 +56,7 @@ type BlockVerifier interface { //go:generate counterfeiter -o fake/orderer_connection_source.go --fake-name OrdererConnectionSource . OrdererConnectionSource type OrdererConnectionSource interface { RandomEndpoint() (*orderers.Endpoint, error) - Endpoints() []*orderers.Endpoint + ShuffledEndpoints() []*orderers.Endpoint } //go:generate counterfeiter -o fake/dialer.go --fake-name Dialer . Dialer @@ -208,7 +208,7 @@ func (d *Deliverer) DeliverBlocks() { switch err.(type) { case *errRefreshEndpoint: // Don't count it as an error, we'll reconnect immediately. - case *errStopping: + case *ErrStopping: // Don't count it as an error, it is a signal to stop. default: failureCounter++ diff --git a/internal/pkg/peer/blocksprovider/deliverer_test.go b/internal/pkg/peer/blocksprovider/deliverer_test.go index 5907bbd45e4..e9474ff5722 100644 --- a/internal/pkg/peer/blocksprovider/deliverer_test.go +++ b/internal/pkg/peer/blocksprovider/deliverer_test.go @@ -302,9 +302,9 @@ var _ = Describe("CFT-Deliverer", func() { When("an error occurs, then a block is successfully delivered", func() { BeforeEach(func() { fakeDeliverStreamer.DeliverReturnsOnCall(0, nil, fmt.Errorf("deliver-error")) - fakeDeliverStreamer.DeliverReturnsOnCall(1, fakeDeliverClient, nil) fakeDeliverStreamer.DeliverReturnsOnCall(1, nil, fmt.Errorf("deliver-error")) fakeDeliverStreamer.DeliverReturnsOnCall(2, nil, fmt.Errorf("deliver-error")) + fakeDeliverStreamer.DeliverReturnsOnCall(3, fakeDeliverClient, nil) }) It("sleeps in an exponential fashion and retries until dial is successful", func() { diff --git a/internal/pkg/peer/blocksprovider/fake/censorship_detector.go b/internal/pkg/peer/blocksprovider/fake/censorship_detector.go new file mode 100644 index 00000000000..a241be9d20a --- /dev/null +++ b/internal/pkg/peer/blocksprovider/fake/censorship_detector.go @@ -0,0 +1,162 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package fake + +import ( + "sync" + + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" +) + +type CensorshipDetector struct { + ErrorsChannelStub func() <-chan error + errorsChannelMutex sync.RWMutex + errorsChannelArgsForCall []struct { + } + errorsChannelReturns struct { + result1 <-chan error + } + errorsChannelReturnsOnCall map[int]struct { + result1 <-chan error + } + MonitorStub func() + monitorMutex sync.RWMutex + monitorArgsForCall []struct { + } + StopStub func() + stopMutex sync.RWMutex + stopArgsForCall []struct { + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *CensorshipDetector) ErrorsChannel() <-chan error { + fake.errorsChannelMutex.Lock() + ret, specificReturn := fake.errorsChannelReturnsOnCall[len(fake.errorsChannelArgsForCall)] + fake.errorsChannelArgsForCall = append(fake.errorsChannelArgsForCall, struct { + }{}) + stub := fake.ErrorsChannelStub + fakeReturns := fake.errorsChannelReturns + fake.recordInvocation("ErrorsChannel", []interface{}{}) + fake.errorsChannelMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *CensorshipDetector) ErrorsChannelCallCount() int { + fake.errorsChannelMutex.RLock() + defer fake.errorsChannelMutex.RUnlock() + return len(fake.errorsChannelArgsForCall) +} + +func (fake *CensorshipDetector) ErrorsChannelCalls(stub func() <-chan error) { + fake.errorsChannelMutex.Lock() + defer fake.errorsChannelMutex.Unlock() + fake.ErrorsChannelStub = stub +} + +func (fake *CensorshipDetector) ErrorsChannelReturns(result1 <-chan error) { + fake.errorsChannelMutex.Lock() + defer fake.errorsChannelMutex.Unlock() + fake.ErrorsChannelStub = nil + fake.errorsChannelReturns = struct { + result1 <-chan error + }{result1} +} + +func (fake *CensorshipDetector) ErrorsChannelReturnsOnCall(i int, result1 <-chan error) { + fake.errorsChannelMutex.Lock() + defer fake.errorsChannelMutex.Unlock() + fake.ErrorsChannelStub = nil + if fake.errorsChannelReturnsOnCall == nil { + fake.errorsChannelReturnsOnCall = make(map[int]struct { + result1 <-chan error + }) + } + fake.errorsChannelReturnsOnCall[i] = struct { + result1 <-chan error + }{result1} +} + +func (fake *CensorshipDetector) Monitor() { + fake.monitorMutex.Lock() + fake.monitorArgsForCall = append(fake.monitorArgsForCall, struct { + }{}) + stub := fake.MonitorStub + fake.recordInvocation("Monitor", []interface{}{}) + fake.monitorMutex.Unlock() + if stub != nil { + fake.MonitorStub() + } +} + +func (fake *CensorshipDetector) MonitorCallCount() int { + fake.monitorMutex.RLock() + defer fake.monitorMutex.RUnlock() + return len(fake.monitorArgsForCall) +} + +func (fake *CensorshipDetector) MonitorCalls(stub func()) { + fake.monitorMutex.Lock() + defer fake.monitorMutex.Unlock() + fake.MonitorStub = stub +} + +func (fake *CensorshipDetector) Stop() { + fake.stopMutex.Lock() + fake.stopArgsForCall = append(fake.stopArgsForCall, struct { + }{}) + stub := fake.StopStub + fake.recordInvocation("Stop", []interface{}{}) + fake.stopMutex.Unlock() + if stub != nil { + fake.StopStub() + } +} + +func (fake *CensorshipDetector) StopCallCount() int { + fake.stopMutex.RLock() + defer fake.stopMutex.RUnlock() + return len(fake.stopArgsForCall) +} + +func (fake *CensorshipDetector) StopCalls(stub func()) { + fake.stopMutex.Lock() + defer fake.stopMutex.Unlock() + fake.StopStub = stub +} + +func (fake *CensorshipDetector) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.errorsChannelMutex.RLock() + defer fake.errorsChannelMutex.RUnlock() + fake.monitorMutex.RLock() + defer fake.monitorMutex.RUnlock() + fake.stopMutex.RLock() + defer fake.stopMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *CensorshipDetector) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ blocksprovider.CensorshipDetector = new(CensorshipDetector) diff --git a/internal/pkg/peer/blocksprovider/fake/censorship_detector_factory.go b/internal/pkg/peer/blocksprovider/fake/censorship_detector_factory.go new file mode 100644 index 00000000000..40fd224aff0 --- /dev/null +++ b/internal/pkg/peer/blocksprovider/fake/censorship_detector_factory.go @@ -0,0 +1,129 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package fake + +import ( + "sync" + + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" +) + +type CensorshipDetectorFactory struct { + CreateStub func(string, blocksprovider.BlockVerifier, blocksprovider.DeliverClientRequester, blocksprovider.BlockProgressReporter, []*orderers.Endpoint, int, blocksprovider.TimeoutConfig) blocksprovider.CensorshipDetector + createMutex sync.RWMutex + createArgsForCall []struct { + arg1 string + arg2 blocksprovider.BlockVerifier + arg3 blocksprovider.DeliverClientRequester + arg4 blocksprovider.BlockProgressReporter + arg5 []*orderers.Endpoint + arg6 int + arg7 blocksprovider.TimeoutConfig + } + createReturns struct { + result1 blocksprovider.CensorshipDetector + } + createReturnsOnCall map[int]struct { + result1 blocksprovider.CensorshipDetector + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *CensorshipDetectorFactory) Create(arg1 string, arg2 blocksprovider.BlockVerifier, arg3 blocksprovider.DeliverClientRequester, arg4 blocksprovider.BlockProgressReporter, arg5 []*orderers.Endpoint, arg6 int, arg7 blocksprovider.TimeoutConfig) blocksprovider.CensorshipDetector { + var arg5Copy []*orderers.Endpoint + if arg5 != nil { + arg5Copy = make([]*orderers.Endpoint, len(arg5)) + copy(arg5Copy, arg5) + } + fake.createMutex.Lock() + ret, specificReturn := fake.createReturnsOnCall[len(fake.createArgsForCall)] + fake.createArgsForCall = append(fake.createArgsForCall, struct { + arg1 string + arg2 blocksprovider.BlockVerifier + arg3 blocksprovider.DeliverClientRequester + arg4 blocksprovider.BlockProgressReporter + arg5 []*orderers.Endpoint + arg6 int + arg7 blocksprovider.TimeoutConfig + }{arg1, arg2, arg3, arg4, arg5Copy, arg6, arg7}) + stub := fake.CreateStub + fakeReturns := fake.createReturns + fake.recordInvocation("Create", []interface{}{arg1, arg2, arg3, arg4, arg5Copy, arg6, arg7}) + fake.createMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4, arg5, arg6, arg7) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *CensorshipDetectorFactory) CreateCallCount() int { + fake.createMutex.RLock() + defer fake.createMutex.RUnlock() + return len(fake.createArgsForCall) +} + +func (fake *CensorshipDetectorFactory) CreateCalls(stub func(string, blocksprovider.BlockVerifier, blocksprovider.DeliverClientRequester, blocksprovider.BlockProgressReporter, []*orderers.Endpoint, int, blocksprovider.TimeoutConfig) blocksprovider.CensorshipDetector) { + fake.createMutex.Lock() + defer fake.createMutex.Unlock() + fake.CreateStub = stub +} + +func (fake *CensorshipDetectorFactory) CreateArgsForCall(i int) (string, blocksprovider.BlockVerifier, blocksprovider.DeliverClientRequester, blocksprovider.BlockProgressReporter, []*orderers.Endpoint, int, blocksprovider.TimeoutConfig) { + fake.createMutex.RLock() + defer fake.createMutex.RUnlock() + argsForCall := fake.createArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6, argsForCall.arg7 +} + +func (fake *CensorshipDetectorFactory) CreateReturns(result1 blocksprovider.CensorshipDetector) { + fake.createMutex.Lock() + defer fake.createMutex.Unlock() + fake.CreateStub = nil + fake.createReturns = struct { + result1 blocksprovider.CensorshipDetector + }{result1} +} + +func (fake *CensorshipDetectorFactory) CreateReturnsOnCall(i int, result1 blocksprovider.CensorshipDetector) { + fake.createMutex.Lock() + defer fake.createMutex.Unlock() + fake.CreateStub = nil + if fake.createReturnsOnCall == nil { + fake.createReturnsOnCall = make(map[int]struct { + result1 blocksprovider.CensorshipDetector + }) + } + fake.createReturnsOnCall[i] = struct { + result1 blocksprovider.CensorshipDetector + }{result1} +} + +func (fake *CensorshipDetectorFactory) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createMutex.RLock() + defer fake.createMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *CensorshipDetectorFactory) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ blocksprovider.CensorshipDetectorFactory = new(CensorshipDetectorFactory) diff --git a/internal/pkg/peer/blocksprovider/fake/duration_exceeded_handler.go b/internal/pkg/peer/blocksprovider/fake/duration_exceeded_handler.go new file mode 100644 index 00000000000..0cc04616a5c --- /dev/null +++ b/internal/pkg/peer/blocksprovider/fake/duration_exceeded_handler.go @@ -0,0 +1,102 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package fake + +import ( + "sync" + + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" +) + +type DurationExceededHandler struct { + DurationExceededHandlerStub func() bool + durationExceededHandlerMutex sync.RWMutex + durationExceededHandlerArgsForCall []struct { + } + durationExceededHandlerReturns struct { + result1 bool + } + durationExceededHandlerReturnsOnCall map[int]struct { + result1 bool + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *DurationExceededHandler) DurationExceededHandler() bool { + fake.durationExceededHandlerMutex.Lock() + ret, specificReturn := fake.durationExceededHandlerReturnsOnCall[len(fake.durationExceededHandlerArgsForCall)] + fake.durationExceededHandlerArgsForCall = append(fake.durationExceededHandlerArgsForCall, struct { + }{}) + stub := fake.DurationExceededHandlerStub + fakeReturns := fake.durationExceededHandlerReturns + fake.recordInvocation("DurationExceededHandler", []interface{}{}) + fake.durationExceededHandlerMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *DurationExceededHandler) DurationExceededHandlerCallCount() int { + fake.durationExceededHandlerMutex.RLock() + defer fake.durationExceededHandlerMutex.RUnlock() + return len(fake.durationExceededHandlerArgsForCall) +} + +func (fake *DurationExceededHandler) DurationExceededHandlerCalls(stub func() bool) { + fake.durationExceededHandlerMutex.Lock() + defer fake.durationExceededHandlerMutex.Unlock() + fake.DurationExceededHandlerStub = stub +} + +func (fake *DurationExceededHandler) DurationExceededHandlerReturns(result1 bool) { + fake.durationExceededHandlerMutex.Lock() + defer fake.durationExceededHandlerMutex.Unlock() + fake.DurationExceededHandlerStub = nil + fake.durationExceededHandlerReturns = struct { + result1 bool + }{result1} +} + +func (fake *DurationExceededHandler) DurationExceededHandlerReturnsOnCall(i int, result1 bool) { + fake.durationExceededHandlerMutex.Lock() + defer fake.durationExceededHandlerMutex.Unlock() + fake.DurationExceededHandlerStub = nil + if fake.durationExceededHandlerReturnsOnCall == nil { + fake.durationExceededHandlerReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.durationExceededHandlerReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + +func (fake *DurationExceededHandler) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.durationExceededHandlerMutex.RLock() + defer fake.durationExceededHandlerMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *DurationExceededHandler) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ blocksprovider.DurationExceededHandler = new(DurationExceededHandler) diff --git a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go index 97b1f60da14..a8c3487a5a4 100644 --- a/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go +++ b/internal/pkg/peer/blocksprovider/fake/orderer_connection_source.go @@ -9,16 +9,6 @@ import ( ) type OrdererConnectionSource struct { - EndpointsStub func() []*orderers.Endpoint - endpointsMutex sync.RWMutex - endpointsArgsForCall []struct { - } - endpointsReturns struct { - result1 []*orderers.Endpoint - } - endpointsReturnsOnCall map[int]struct { - result1 []*orderers.Endpoint - } RandomEndpointStub func() (*orderers.Endpoint, error) randomEndpointMutex sync.RWMutex randomEndpointArgsForCall []struct { @@ -31,61 +21,18 @@ type OrdererConnectionSource struct { result1 *orderers.Endpoint result2 error } - invocations map[string][][]interface{} - invocationsMutex sync.RWMutex -} - -func (fake *OrdererConnectionSource) Endpoints() []*orderers.Endpoint { - fake.endpointsMutex.Lock() - ret, specificReturn := fake.endpointsReturnsOnCall[len(fake.endpointsArgsForCall)] - fake.endpointsArgsForCall = append(fake.endpointsArgsForCall, struct { - }{}) - stub := fake.EndpointsStub - fakeReturns := fake.endpointsReturns - fake.recordInvocation("Endpoints", []interface{}{}) - fake.endpointsMutex.Unlock() - if stub != nil { - return stub() - } - if specificReturn { - return ret.result1 + ShuffledEndpointsStub func() []*orderers.Endpoint + shuffledEndpointsMutex sync.RWMutex + shuffledEndpointsArgsForCall []struct { } - return fakeReturns.result1 -} - -func (fake *OrdererConnectionSource) EndpointsCallCount() int { - fake.endpointsMutex.RLock() - defer fake.endpointsMutex.RUnlock() - return len(fake.endpointsArgsForCall) -} - -func (fake *OrdererConnectionSource) EndpointsCalls(stub func() []*orderers.Endpoint) { - fake.endpointsMutex.Lock() - defer fake.endpointsMutex.Unlock() - fake.EndpointsStub = stub -} - -func (fake *OrdererConnectionSource) EndpointsReturns(result1 []*orderers.Endpoint) { - fake.endpointsMutex.Lock() - defer fake.endpointsMutex.Unlock() - fake.EndpointsStub = nil - fake.endpointsReturns = struct { + shuffledEndpointsReturns struct { result1 []*orderers.Endpoint - }{result1} -} - -func (fake *OrdererConnectionSource) EndpointsReturnsOnCall(i int, result1 []*orderers.Endpoint) { - fake.endpointsMutex.Lock() - defer fake.endpointsMutex.Unlock() - fake.EndpointsStub = nil - if fake.endpointsReturnsOnCall == nil { - fake.endpointsReturnsOnCall = make(map[int]struct { - result1 []*orderers.Endpoint - }) } - fake.endpointsReturnsOnCall[i] = struct { + shuffledEndpointsReturnsOnCall map[int]struct { result1 []*orderers.Endpoint - }{result1} + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex } func (fake *OrdererConnectionSource) RandomEndpoint() (*orderers.Endpoint, error) { @@ -144,13 +91,66 @@ func (fake *OrdererConnectionSource) RandomEndpointReturnsOnCall(i int, result1 }{result1, result2} } +func (fake *OrdererConnectionSource) ShuffledEndpoints() []*orderers.Endpoint { + fake.shuffledEndpointsMutex.Lock() + ret, specificReturn := fake.shuffledEndpointsReturnsOnCall[len(fake.shuffledEndpointsArgsForCall)] + fake.shuffledEndpointsArgsForCall = append(fake.shuffledEndpointsArgsForCall, struct { + }{}) + stub := fake.ShuffledEndpointsStub + fakeReturns := fake.shuffledEndpointsReturns + fake.recordInvocation("ShuffledEndpoints", []interface{}{}) + fake.shuffledEndpointsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConnectionSource) ShuffledEndpointsCallCount() int { + fake.shuffledEndpointsMutex.RLock() + defer fake.shuffledEndpointsMutex.RUnlock() + return len(fake.shuffledEndpointsArgsForCall) +} + +func (fake *OrdererConnectionSource) ShuffledEndpointsCalls(stub func() []*orderers.Endpoint) { + fake.shuffledEndpointsMutex.Lock() + defer fake.shuffledEndpointsMutex.Unlock() + fake.ShuffledEndpointsStub = stub +} + +func (fake *OrdererConnectionSource) ShuffledEndpointsReturns(result1 []*orderers.Endpoint) { + fake.shuffledEndpointsMutex.Lock() + defer fake.shuffledEndpointsMutex.Unlock() + fake.ShuffledEndpointsStub = nil + fake.shuffledEndpointsReturns = struct { + result1 []*orderers.Endpoint + }{result1} +} + +func (fake *OrdererConnectionSource) ShuffledEndpointsReturnsOnCall(i int, result1 []*orderers.Endpoint) { + fake.shuffledEndpointsMutex.Lock() + defer fake.shuffledEndpointsMutex.Unlock() + fake.ShuffledEndpointsStub = nil + if fake.shuffledEndpointsReturnsOnCall == nil { + fake.shuffledEndpointsReturnsOnCall = make(map[int]struct { + result1 []*orderers.Endpoint + }) + } + fake.shuffledEndpointsReturnsOnCall[i] = struct { + result1 []*orderers.Endpoint + }{result1} +} + func (fake *OrdererConnectionSource) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.endpointsMutex.RLock() - defer fake.endpointsMutex.RUnlock() fake.randomEndpointMutex.RLock() defer fake.randomEndpointMutex.RUnlock() + fake.shuffledEndpointsMutex.RLock() + defer fake.shuffledEndpointsMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/internal/pkg/peer/blocksprovider/util.go b/internal/pkg/peer/blocksprovider/util.go index c32590f09b7..96c42d7473d 100644 --- a/internal/pkg/peer/blocksprovider/util.go +++ b/internal/pkg/peer/blocksprovider/util.go @@ -8,10 +8,7 @@ package blocksprovider import ( "math" - "math/rand" "time" - - "github.com/hyperledger/fabric/internal/pkg/peer/orderers" ) type errRefreshEndpoint struct { @@ -22,28 +19,28 @@ func (e *errRefreshEndpoint) Error() string { return e.message } -type errStopping struct { - message string +type ErrStopping struct { + Message string } -func (e *errStopping) Error() string { - return e.message +func (e *ErrStopping) Error() string { + return e.Message } -type errFatal struct { - message string +type ErrFatal struct { + Message string } -func (e *errFatal) Error() string { - return e.message +func (e *ErrFatal) Error() string { + return e.Message } -type errCensorship struct { - message string +type ErrCensorship struct { + Message string } -func (e *errCensorship) Error() string { - return e.message +func (e *ErrCensorship) Error() string { + return e.Message } func backOffDuration(base float64, exponent uint, minDur, maxDur time.Duration) time.Duration { @@ -82,17 +79,6 @@ func numRetries2Max(base float64, minDur, maxDur time.Duration) int { return int(math.Ceil(math.Log(float64(maxDur)/float64(minDur)) / math.Log(base))) } -// shuffle the endpoint slice -func shuffle(a []*orderers.Endpoint) []*orderers.Endpoint { - n := len(a) - returnedSlice := make([]*orderers.Endpoint, n) - indices := rand.Perm(n) - for i, idx := range indices { - returnedSlice[i] = a[idx] - } - return returnedSlice -} - type timeNumber struct { t time.Time n uint64 diff --git a/internal/pkg/peer/blocksprovider/util_test.go b/internal/pkg/peer/blocksprovider/util_test.go index f50c7d40012..1c9526e2e4c 100644 --- a/internal/pkg/peer/blocksprovider/util_test.go +++ b/internal/pkg/peer/blocksprovider/util_test.go @@ -27,6 +27,10 @@ func TestBackoffDuration(t *testing.T) { dur = backOffDuration(2.0, 20, BftMinRetryInterval, BftMaxRetryInterval) assert.Equal(t, BftMaxRetryInterval, dur) + // very large exponent -> dur=max + dur = backOffDuration(2.0, 1000000, BftMinRetryInterval, BftMaxRetryInterval) + assert.Equal(t, BftMaxRetryInterval, dur) + // max < min -> max=min dur = backOffDuration(2.0, 0, BftMinRetryInterval, BftMinRetryInterval/2) assert.Equal(t, BftMinRetryInterval, dur) diff --git a/internal/pkg/peer/orderers/connection.go b/internal/pkg/peer/orderers/connection.go index d1f04f79238..bc129767ec3 100644 --- a/internal/pkg/peer/orderers/connection.go +++ b/internal/pkg/peer/orderers/connection.go @@ -65,6 +65,7 @@ func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*En } } +// RandomEndpoint returns a random endpoint. func (cs *ConnectionSource) RandomEndpoint() (*Endpoint, error) { cs.mutex.RLock() defer cs.mutex.RUnlock() @@ -81,6 +82,20 @@ func (cs *ConnectionSource) Endpoints() []*Endpoint { return cs.allEndpoints } +// ShuffledEndpoints returns a shuffled array of endpoints in a new slice. +func (cs *ConnectionSource) ShuffledEndpoints() []*Endpoint { + cs.mutex.RLock() + defer cs.mutex.RUnlock() + + n := len(cs.allEndpoints) + returnedSlice := make([]*Endpoint, n) + indices := rand.Perm(n) + for i, idx := range indices { + returnedSlice[i] = cs.allEndpoints[idx] + } + return returnedSlice +} + func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]OrdererOrg) { cs.mutex.Lock() defer cs.mutex.Unlock() diff --git a/internal/pkg/peer/orderers/connection_test.go b/internal/pkg/peer/orderers/connection_test.go index 7f9f98218eb..11f9c4cd663 100644 --- a/internal/pkg/peer/orderers/connection_test.go +++ b/internal/pkg/peer/orderers/connection_test.go @@ -10,6 +10,7 @@ import ( "bytes" "os" "sort" + "strings" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -136,6 +137,39 @@ var _ = Describe("Connection", func() { Expect(e.String()).To(Equal("")) }) + It("returns shuffled endpoints", func() { // there is a chance of failure here, but it is very small. + combinationSet := make(map[string]bool) + for i := 0; i < 10000; i++ { + shuffledEndpoints := cs.ShuffledEndpoints() + Expect(stripEndpoints(shuffledEndpoints)).To(ConsistOf( + stripEndpoints(endpoints), + )) + key := strings.Builder{} + for _, ep := range shuffledEndpoints { + key.WriteString(ep.Address) + key.WriteString(" ") + } + combinationSet[key.String()] = true + } + + Expect(len(combinationSet)).To(Equal(4 * 3 * 2 * 1)) + }) + + It("returns random endpoint", func() { // there is a chance of failure here, but it is very small. + combinationMap := make(map[string]*orderers.Endpoint) + for i := 0; i < 10000; i++ { + r, _ := cs.RandomEndpoint() + combinationMap[r.Address] = r + } + var all []*orderers.Endpoint + for _, ep := range combinationMap { + all = append(all, ep) + } + Expect(stripEndpoints(all)).To(ConsistOf( + stripEndpoints(endpoints), + )) + }) + When("an update does not modify the endpoint set", func() { BeforeEach(func() { cs.Update(nil, map[string]orderers.OrdererOrg{