diff --git a/orderer/common/cluster/deliver.go b/orderer/common/cluster/deliver.go index 8f66061866f..444717e4af2 100644 --- a/orderer/common/cluster/deliver.go +++ b/orderer/common/cluster/deliver.go @@ -45,7 +45,7 @@ type BlockPuller struct { // A 'stopper' goroutine may signal the go-routine servicing PullBlock & HeightsByEndpoints to stop by closing this // channel. Note: all methods of the BlockPuller must be serviced by a single goroutine, it is not thread safe. - // It is the responsibility of the 'stopper' not to close the channel more then once. + // It is the responsibility of the 'stopper' not to close the channel more than once. StopChannel chan struct{} // Internal state diff --git a/orderer/consensus/smartbft/block_puller_factory.go b/orderer/consensus/smartbft/block_puller_factory.go new file mode 100644 index 00000000000..c264a19e773 --- /dev/null +++ b/orderer/consensus/smartbft/block_puller_factory.go @@ -0,0 +1,109 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +import ( + "crypto/x509" + "encoding/pem" + + "github.com/hyperledger/fabric-lib-go/bccsp" + "github.com/hyperledger/fabric-lib-go/common/flogging" + cb "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/common/localconfig" + "github.com/hyperledger/fabric/orderer/consensus" + "github.com/hyperledger/fabric/orderer/consensus/etcdraft" + "github.com/pkg/errors" +) + +//go:generate counterfeiter -o mocks/block_puller.go . BlockPuller + +// BlockPuller is used to pull blocks from other OSN +type BlockPuller interface { + PullBlock(seq uint64) *cb.Block + HeightsByEndpoints() (map[string]uint64, string, error) + Close() +} + +//go:generate counterfeiter -o mocks/block_puller_factory.go . BlockPullerFactory + +type BlockPullerFactory interface { + // CreateBlockPuller creates a new block puller. + CreateBlockPuller( + support consensus.ConsenterSupport, + baseDialer *cluster.PredicateDialer, + clusterConfig localconfig.Cluster, + bccsp bccsp.BCCSP, + ) (BlockPuller, error) +} + +type blockPullerCreator struct{} + +func (*blockPullerCreator) CreateBlockPuller( + support consensus.ConsenterSupport, + baseDialer *cluster.PredicateDialer, + clusterConfig localconfig.Cluster, + bccsp bccsp.BCCSP, +) (BlockPuller, error) { + return newBlockPuller(support, baseDialer, clusterConfig, bccsp) +} + +// newBlockPuller creates a new block puller +func newBlockPuller( + support consensus.ConsenterSupport, + baseDialer *cluster.PredicateDialer, + clusterConfig localconfig.Cluster, + bccsp bccsp.BCCSP, +) (BlockPuller, error) { + verifyBlockSequence := func(blocks []*cb.Block, _ string) error { + vb := cluster.BlockVerifierBuilder(bccsp) + return cluster.VerifyBlocksBFT(blocks, support.SignatureVerifier(), vb) + } + + stdDialer := &cluster.StandardDialer{ + Config: baseDialer.Config.Clone(), + } + stdDialer.Config.AsyncConnect = false + stdDialer.Config.SecOpts.VerifyCertificate = nil + + // Extract the TLS CA certs and endpoints from the configuration, + endpoints, err := etcdraft.EndpointconfigFromSupport(support, bccsp) + if err != nil { + return nil, err + } + + logger := flogging.MustGetLogger("orderer.common.cluster.puller") + + der, _ := pem.Decode(stdDialer.Config.SecOpts.Certificate) + if der == nil { + return nil, errors.Errorf("client certificate isn't in PEM format: %v", + string(stdDialer.Config.SecOpts.Certificate)) + } + + myCert, err := x509.ParseCertificate(der.Bytes) + if err != nil { + logger.Warnf("Failed parsing my own TLS certificate: %v, therefore we may connect to our own endpoint when pulling blocks", err) + } + + bp := &cluster.BlockPuller{ + MyOwnTLSCert: myCert, + VerifyBlockSequence: verifyBlockSequence, + Logger: logger, + RetryTimeout: clusterConfig.ReplicationRetryTimeout, + MaxTotalBufferBytes: clusterConfig.ReplicationBufferSize, + FetchTimeout: clusterConfig.ReplicationPullTimeout, + Endpoints: endpoints, + Signer: support, + TLSCert: der.Bytes, + Channel: support.ChannelID(), + Dialer: stdDialer, + } + + logger.Infof("Built new block puller with cluster config: %+v, endpoints: %+v", clusterConfig, endpoints) + + return bp, nil +} diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index b6b34ccc39b..87d0bd0c764 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -36,15 +36,6 @@ import ( "go.uber.org/zap" ) -//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller - -// BlockPuller is used to pull blocks from other OSN -type BlockPuller interface { - PullBlock(seq uint64) *cb.Block - HeightsByEndpoints() (map[string]uint64, string, error) - Close() -} - // WALConfig consensus specific configuration parameters from orderer.yaml; for SmartBFT only WALDir is relevant. type WALConfig struct { WALDir string // WAL data of is stored in WALDir/ @@ -72,8 +63,8 @@ type BFTChain struct { Channel string Config types.Configuration BlockPuller BlockPuller - clusterDialer *cluster.PredicateDialer // TODO Required by BFT-synchronizer - localConfigCluster localconfig.Cluster // TODO Required by BFT-synchronizer + clusterDialer *cluster.PredicateDialer // Required by BFT-synchronizer + localConfigCluster localconfig.Cluster // Required by BFT-synchronizer Comm cluster.Communicator SignerSerializer signerSerializer PolicyManager policies.Manager @@ -131,8 +122,8 @@ func NewChain( SignerSerializer: signerSerializer, PolicyManager: policyManager, BlockPuller: blockPuller, // FIXME create internally or with a factory - clusterDialer: clusterDialer, // TODO Required by BFT-synchronizer - localConfigCluster: localConfigCluster, // TODO Required by BFT-synchronizer + clusterDialer: clusterDialer, // Required by BFT-synchronizer + localConfigCluster: localConfigCluster, // Required by BFT-synchronizer Logger: logger, consensusRelation: types2.ConsensusRelationConsenter, status: types2.StatusActive, @@ -212,8 +203,27 @@ func bftSmartConsensusBuild( var sync api.Synchronizer switch c.localConfigCluster.ReplicationPolicy { case "consensus": - c.Logger.Debug("BFTSynchronizer not yet available") // TODO create BFTSynchronizer when available - fallthrough + c.Logger.Debug("Creating a BFTSynchronizer") + sync = &BFTSynchronizer{ + selfID: rtc.id, + LatestConfig: func() (types.Configuration, []uint64) { + rtc := c.RuntimeConfig.Load().(RuntimeConfig) + return rtc.BFTConfig, rtc.Nodes + }, + BlockToDecision: c.blockToDecision, + OnCommit: func(block *cb.Block) types.Reconfig { + c.pruneCommittedRequests(block) + return c.updateRuntimeConfig(block) + }, + Support: c.support, + CryptoProvider: c.bccsp, + ClusterDialer: c.clusterDialer, + LocalConfigCluster: c.localConfigCluster, + BlockPullerFactory: &blockPullerCreator{}, + VerifierFactory: &verifierCreator{}, + BFTDelivererFactory: &bftDelivererCreator{}, + Logger: c.Logger, + } case "simple": c.Logger.Debug("Creating simple Synchronizer") sync = &Synchronizer{ diff --git a/orderer/consensus/smartbft/mocks/bft_block_deliverer.go b/orderer/consensus/smartbft/mocks/bft_block_deliverer.go new file mode 100644 index 00000000000..569782b8766 --- /dev/null +++ b/orderer/consensus/smartbft/mocks/bft_block_deliverer.go @@ -0,0 +1,139 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/orderer/consensus/smartbft" +) + +type BFTBlockDeliverer struct { + DeliverBlocksStub func() + deliverBlocksMutex sync.RWMutex + deliverBlocksArgsForCall []struct { + } + InitializeStub func(*common.Config, string) + initializeMutex sync.RWMutex + initializeArgsForCall []struct { + arg1 *common.Config + arg2 string + } + StopStub func() + stopMutex sync.RWMutex + stopArgsForCall []struct { + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *BFTBlockDeliverer) DeliverBlocks() { + fake.deliverBlocksMutex.Lock() + fake.deliverBlocksArgsForCall = append(fake.deliverBlocksArgsForCall, struct { + }{}) + stub := fake.DeliverBlocksStub + fake.recordInvocation("DeliverBlocks", []interface{}{}) + fake.deliverBlocksMutex.Unlock() + if stub != nil { + fake.DeliverBlocksStub() + } +} + +func (fake *BFTBlockDeliverer) DeliverBlocksCallCount() int { + fake.deliverBlocksMutex.RLock() + defer fake.deliverBlocksMutex.RUnlock() + return len(fake.deliverBlocksArgsForCall) +} + +func (fake *BFTBlockDeliverer) DeliverBlocksCalls(stub func()) { + fake.deliverBlocksMutex.Lock() + defer fake.deliverBlocksMutex.Unlock() + fake.DeliverBlocksStub = stub +} + +func (fake *BFTBlockDeliverer) Initialize(arg1 *common.Config, arg2 string) { + fake.initializeMutex.Lock() + fake.initializeArgsForCall = append(fake.initializeArgsForCall, struct { + arg1 *common.Config + arg2 string + }{arg1, arg2}) + stub := fake.InitializeStub + fake.recordInvocation("Initialize", []interface{}{arg1, arg2}) + fake.initializeMutex.Unlock() + if stub != nil { + fake.InitializeStub(arg1, arg2) + } +} + +func (fake *BFTBlockDeliverer) InitializeCallCount() int { + fake.initializeMutex.RLock() + defer fake.initializeMutex.RUnlock() + return len(fake.initializeArgsForCall) +} + +func (fake *BFTBlockDeliverer) InitializeCalls(stub func(*common.Config, string)) { + fake.initializeMutex.Lock() + defer fake.initializeMutex.Unlock() + fake.InitializeStub = stub +} + +func (fake *BFTBlockDeliverer) InitializeArgsForCall(i int) (*common.Config, string) { + fake.initializeMutex.RLock() + defer fake.initializeMutex.RUnlock() + argsForCall := fake.initializeArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *BFTBlockDeliverer) Stop() { + fake.stopMutex.Lock() + fake.stopArgsForCall = append(fake.stopArgsForCall, struct { + }{}) + stub := fake.StopStub + fake.recordInvocation("Stop", []interface{}{}) + fake.stopMutex.Unlock() + if stub != nil { + fake.StopStub() + } +} + +func (fake *BFTBlockDeliverer) StopCallCount() int { + fake.stopMutex.RLock() + defer fake.stopMutex.RUnlock() + return len(fake.stopArgsForCall) +} + +func (fake *BFTBlockDeliverer) StopCalls(stub func()) { + fake.stopMutex.Lock() + defer fake.stopMutex.Unlock() + fake.StopStub = stub +} + +func (fake *BFTBlockDeliverer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.deliverBlocksMutex.RLock() + defer fake.deliverBlocksMutex.RUnlock() + fake.initializeMutex.RLock() + defer fake.initializeMutex.RUnlock() + fake.stopMutex.RLock() + defer fake.stopMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *BFTBlockDeliverer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ smartbft.BFTBlockDeliverer = new(BFTBlockDeliverer) diff --git a/orderer/consensus/smartbft/mocks/bft_deliverer_factory.go b/orderer/consensus/smartbft/mocks/bft_deliverer_factory.go new file mode 100644 index 00000000000..ed79cfe95d6 --- /dev/null +++ b/orderer/consensus/smartbft/mocks/bft_deliverer_factory.go @@ -0,0 +1,148 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + "time" + + "github.com/hyperledger/fabric-lib-go/bccsp" + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric/internal/pkg/identity" + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/hyperledger/fabric/orderer/consensus/smartbft" +) + +type BFTDelivererFactory struct { + CreateBFTDelivererStub func(string, blocksprovider.BlockHandler, blocksprovider.LedgerInfo, blocksprovider.UpdatableBlockVerifier, blocksprovider.Dialer, blocksprovider.OrdererConnectionSourceFactory, bccsp.BCCSP, chan struct{}, identity.SignerSerializer, blocksprovider.DeliverStreamer, blocksprovider.CensorshipDetectorFactory, *flogging.FabricLogger, time.Duration, time.Duration, time.Duration, time.Duration, blocksprovider.MaxRetryDurationExceededHandler) smartbft.BFTBlockDeliverer + createBFTDelivererMutex sync.RWMutex + createBFTDelivererArgsForCall []struct { + arg1 string + arg2 blocksprovider.BlockHandler + arg3 blocksprovider.LedgerInfo + arg4 blocksprovider.UpdatableBlockVerifier + arg5 blocksprovider.Dialer + arg6 blocksprovider.OrdererConnectionSourceFactory + arg7 bccsp.BCCSP + arg8 chan struct{} + arg9 identity.SignerSerializer + arg10 blocksprovider.DeliverStreamer + arg11 blocksprovider.CensorshipDetectorFactory + arg12 *flogging.FabricLogger + arg13 time.Duration + arg14 time.Duration + arg15 time.Duration + arg16 time.Duration + arg17 blocksprovider.MaxRetryDurationExceededHandler + } + createBFTDelivererReturns struct { + result1 smartbft.BFTBlockDeliverer + } + createBFTDelivererReturnsOnCall map[int]struct { + result1 smartbft.BFTBlockDeliverer + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *BFTDelivererFactory) CreateBFTDeliverer(arg1 string, arg2 blocksprovider.BlockHandler, arg3 blocksprovider.LedgerInfo, arg4 blocksprovider.UpdatableBlockVerifier, arg5 blocksprovider.Dialer, arg6 blocksprovider.OrdererConnectionSourceFactory, arg7 bccsp.BCCSP, arg8 chan struct{}, arg9 identity.SignerSerializer, arg10 blocksprovider.DeliverStreamer, arg11 blocksprovider.CensorshipDetectorFactory, arg12 *flogging.FabricLogger, arg13 time.Duration, arg14 time.Duration, arg15 time.Duration, arg16 time.Duration, arg17 blocksprovider.MaxRetryDurationExceededHandler) smartbft.BFTBlockDeliverer { + fake.createBFTDelivererMutex.Lock() + ret, specificReturn := fake.createBFTDelivererReturnsOnCall[len(fake.createBFTDelivererArgsForCall)] + fake.createBFTDelivererArgsForCall = append(fake.createBFTDelivererArgsForCall, struct { + arg1 string + arg2 blocksprovider.BlockHandler + arg3 blocksprovider.LedgerInfo + arg4 blocksprovider.UpdatableBlockVerifier + arg5 blocksprovider.Dialer + arg6 blocksprovider.OrdererConnectionSourceFactory + arg7 bccsp.BCCSP + arg8 chan struct{} + arg9 identity.SignerSerializer + arg10 blocksprovider.DeliverStreamer + arg11 blocksprovider.CensorshipDetectorFactory + arg12 *flogging.FabricLogger + arg13 time.Duration + arg14 time.Duration + arg15 time.Duration + arg16 time.Duration + arg17 blocksprovider.MaxRetryDurationExceededHandler + }{arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, arg17}) + stub := fake.CreateBFTDelivererStub + fakeReturns := fake.createBFTDelivererReturns + fake.recordInvocation("CreateBFTDeliverer", []interface{}{arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, arg17}) + fake.createBFTDelivererMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16, arg17) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *BFTDelivererFactory) CreateBFTDelivererCallCount() int { + fake.createBFTDelivererMutex.RLock() + defer fake.createBFTDelivererMutex.RUnlock() + return len(fake.createBFTDelivererArgsForCall) +} + +func (fake *BFTDelivererFactory) CreateBFTDelivererCalls(stub func(string, blocksprovider.BlockHandler, blocksprovider.LedgerInfo, blocksprovider.UpdatableBlockVerifier, blocksprovider.Dialer, blocksprovider.OrdererConnectionSourceFactory, bccsp.BCCSP, chan struct{}, identity.SignerSerializer, blocksprovider.DeliverStreamer, blocksprovider.CensorshipDetectorFactory, *flogging.FabricLogger, time.Duration, time.Duration, time.Duration, time.Duration, blocksprovider.MaxRetryDurationExceededHandler) smartbft.BFTBlockDeliverer) { + fake.createBFTDelivererMutex.Lock() + defer fake.createBFTDelivererMutex.Unlock() + fake.CreateBFTDelivererStub = stub +} + +func (fake *BFTDelivererFactory) CreateBFTDelivererArgsForCall(i int) (string, blocksprovider.BlockHandler, blocksprovider.LedgerInfo, blocksprovider.UpdatableBlockVerifier, blocksprovider.Dialer, blocksprovider.OrdererConnectionSourceFactory, bccsp.BCCSP, chan struct{}, identity.SignerSerializer, blocksprovider.DeliverStreamer, blocksprovider.CensorshipDetectorFactory, *flogging.FabricLogger, time.Duration, time.Duration, time.Duration, time.Duration, blocksprovider.MaxRetryDurationExceededHandler) { + fake.createBFTDelivererMutex.RLock() + defer fake.createBFTDelivererMutex.RUnlock() + argsForCall := fake.createBFTDelivererArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6, argsForCall.arg7, argsForCall.arg8, argsForCall.arg9, argsForCall.arg10, argsForCall.arg11, argsForCall.arg12, argsForCall.arg13, argsForCall.arg14, argsForCall.arg15, argsForCall.arg16, argsForCall.arg17 +} + +func (fake *BFTDelivererFactory) CreateBFTDelivererReturns(result1 smartbft.BFTBlockDeliverer) { + fake.createBFTDelivererMutex.Lock() + defer fake.createBFTDelivererMutex.Unlock() + fake.CreateBFTDelivererStub = nil + fake.createBFTDelivererReturns = struct { + result1 smartbft.BFTBlockDeliverer + }{result1} +} + +func (fake *BFTDelivererFactory) CreateBFTDelivererReturnsOnCall(i int, result1 smartbft.BFTBlockDeliverer) { + fake.createBFTDelivererMutex.Lock() + defer fake.createBFTDelivererMutex.Unlock() + fake.CreateBFTDelivererStub = nil + if fake.createBFTDelivererReturnsOnCall == nil { + fake.createBFTDelivererReturnsOnCall = make(map[int]struct { + result1 smartbft.BFTBlockDeliverer + }) + } + fake.createBFTDelivererReturnsOnCall[i] = struct { + result1 smartbft.BFTBlockDeliverer + }{result1} +} + +func (fake *BFTDelivererFactory) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createBFTDelivererMutex.RLock() + defer fake.createBFTDelivererMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *BFTDelivererFactory) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ smartbft.BFTDelivererFactory = new(BFTDelivererFactory) diff --git a/orderer/consensus/smartbft/mocks/mock_blockpuller.go b/orderer/consensus/smartbft/mocks/block_puller.go similarity index 92% rename from orderer/consensus/smartbft/mocks/mock_blockpuller.go rename to orderer/consensus/smartbft/mocks/block_puller.go index 3801980a02b..b7d38752292 100644 --- a/orderer/consensus/smartbft/mocks/mock_blockpuller.go +++ b/orderer/consensus/smartbft/mocks/block_puller.go @@ -9,13 +9,15 @@ import ( ) type FakeBlockPuller struct { - CloseStub func() - closeMutex sync.RWMutex - closeArgsForCall []struct{} + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } HeightsByEndpointsStub func() (map[string]uint64, string, error) heightsByEndpointsMutex sync.RWMutex - heightsByEndpointsArgsForCall []struct{} - heightsByEndpointsReturns struct { + heightsByEndpointsArgsForCall []struct { + } + heightsByEndpointsReturns struct { result1 map[string]uint64 result2 string result3 error @@ -42,10 +44,12 @@ type FakeBlockPuller struct { func (fake *FakeBlockPuller) Close() { fake.closeMutex.Lock() - fake.closeArgsForCall = append(fake.closeArgsForCall, struct{}{}) + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub fake.recordInvocation("Close", []interface{}{}) fake.closeMutex.Unlock() - if fake.CloseStub != nil { + if stub != nil { fake.CloseStub() } } @@ -65,16 +69,18 @@ func (fake *FakeBlockPuller) CloseCalls(stub func()) { func (fake *FakeBlockPuller) HeightsByEndpoints() (map[string]uint64, string, error) { fake.heightsByEndpointsMutex.Lock() ret, specificReturn := fake.heightsByEndpointsReturnsOnCall[len(fake.heightsByEndpointsArgsForCall)] - fake.heightsByEndpointsArgsForCall = append(fake.heightsByEndpointsArgsForCall, struct{}{}) + fake.heightsByEndpointsArgsForCall = append(fake.heightsByEndpointsArgsForCall, struct { + }{}) + stub := fake.HeightsByEndpointsStub + fakeReturns := fake.heightsByEndpointsReturns fake.recordInvocation("HeightsByEndpoints", []interface{}{}) fake.heightsByEndpointsMutex.Unlock() - if fake.HeightsByEndpointsStub != nil { - return fake.HeightsByEndpointsStub() + if stub != nil { + return stub() } if specificReturn { return ret.result1, ret.result2, ret.result3 } - fakeReturns := fake.heightsByEndpointsReturns return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } @@ -125,15 +131,16 @@ func (fake *FakeBlockPuller) PullBlock(arg1 uint64) *common.Block { fake.pullBlockArgsForCall = append(fake.pullBlockArgsForCall, struct { arg1 uint64 }{arg1}) + stub := fake.PullBlockStub + fakeReturns := fake.pullBlockReturns fake.recordInvocation("PullBlock", []interface{}{arg1}) fake.pullBlockMutex.Unlock() - if fake.PullBlockStub != nil { - return fake.PullBlockStub(arg1) + if stub != nil { + return stub(arg1) } if specificReturn { return ret.result1 } - fakeReturns := fake.pullBlockReturns return fakeReturns.result1 } diff --git a/orderer/consensus/smartbft/mocks/block_puller_factory.go b/orderer/consensus/smartbft/mocks/block_puller_factory.go new file mode 100644 index 00000000000..a4ff7c5334a --- /dev/null +++ b/orderer/consensus/smartbft/mocks/block_puller_factory.go @@ -0,0 +1,126 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric-lib-go/bccsp" + "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/common/localconfig" + "github.com/hyperledger/fabric/orderer/consensus" + "github.com/hyperledger/fabric/orderer/consensus/smartbft" +) + +type FakeBlockPullerFactory struct { + CreateBlockPullerStub func(consensus.ConsenterSupport, *cluster.PredicateDialer, localconfig.Cluster, bccsp.BCCSP) (smartbft.BlockPuller, error) + createBlockPullerMutex sync.RWMutex + createBlockPullerArgsForCall []struct { + arg1 consensus.ConsenterSupport + arg2 *cluster.PredicateDialer + arg3 localconfig.Cluster + arg4 bccsp.BCCSP + } + createBlockPullerReturns struct { + result1 smartbft.BlockPuller + result2 error + } + createBlockPullerReturnsOnCall map[int]struct { + result1 smartbft.BlockPuller + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeBlockPullerFactory) CreateBlockPuller(arg1 consensus.ConsenterSupport, arg2 *cluster.PredicateDialer, arg3 localconfig.Cluster, arg4 bccsp.BCCSP) (smartbft.BlockPuller, error) { + fake.createBlockPullerMutex.Lock() + ret, specificReturn := fake.createBlockPullerReturnsOnCall[len(fake.createBlockPullerArgsForCall)] + fake.createBlockPullerArgsForCall = append(fake.createBlockPullerArgsForCall, struct { + arg1 consensus.ConsenterSupport + arg2 *cluster.PredicateDialer + arg3 localconfig.Cluster + arg4 bccsp.BCCSP + }{arg1, arg2, arg3, arg4}) + stub := fake.CreateBlockPullerStub + fakeReturns := fake.createBlockPullerReturns + fake.recordInvocation("CreateBlockPuller", []interface{}{arg1, arg2, arg3, arg4}) + fake.createBlockPullerMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeBlockPullerFactory) CreateBlockPullerCallCount() int { + fake.createBlockPullerMutex.RLock() + defer fake.createBlockPullerMutex.RUnlock() + return len(fake.createBlockPullerArgsForCall) +} + +func (fake *FakeBlockPullerFactory) CreateBlockPullerCalls(stub func(consensus.ConsenterSupport, *cluster.PredicateDialer, localconfig.Cluster, bccsp.BCCSP) (smartbft.BlockPuller, error)) { + fake.createBlockPullerMutex.Lock() + defer fake.createBlockPullerMutex.Unlock() + fake.CreateBlockPullerStub = stub +} + +func (fake *FakeBlockPullerFactory) CreateBlockPullerArgsForCall(i int) (consensus.ConsenterSupport, *cluster.PredicateDialer, localconfig.Cluster, bccsp.BCCSP) { + fake.createBlockPullerMutex.RLock() + defer fake.createBlockPullerMutex.RUnlock() + argsForCall := fake.createBlockPullerArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeBlockPullerFactory) CreateBlockPullerReturns(result1 smartbft.BlockPuller, result2 error) { + fake.createBlockPullerMutex.Lock() + defer fake.createBlockPullerMutex.Unlock() + fake.CreateBlockPullerStub = nil + fake.createBlockPullerReturns = struct { + result1 smartbft.BlockPuller + result2 error + }{result1, result2} +} + +func (fake *FakeBlockPullerFactory) CreateBlockPullerReturnsOnCall(i int, result1 smartbft.BlockPuller, result2 error) { + fake.createBlockPullerMutex.Lock() + defer fake.createBlockPullerMutex.Unlock() + fake.CreateBlockPullerStub = nil + if fake.createBlockPullerReturnsOnCall == nil { + fake.createBlockPullerReturnsOnCall = make(map[int]struct { + result1 smartbft.BlockPuller + result2 error + }) + } + fake.createBlockPullerReturnsOnCall[i] = struct { + result1 smartbft.BlockPuller + result2 error + }{result1, result2} +} + +func (fake *FakeBlockPullerFactory) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createBlockPullerMutex.RLock() + defer fake.createBlockPullerMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeBlockPullerFactory) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ smartbft.BlockPullerFactory = new(FakeBlockPullerFactory) diff --git a/orderer/consensus/smartbft/mocks/orderer_config.go b/orderer/consensus/smartbft/mocks/orderer_config.go new file mode 100644 index 00000000000..3f1e49f81a3 --- /dev/null +++ b/orderer/consensus/smartbft/mocks/orderer_config.go @@ -0,0 +1,623 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + "time" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/orderer" + "github.com/hyperledger/fabric/common/channelconfig" +) + +type OrdererConfig struct { + BatchSizeStub func() *orderer.BatchSize + batchSizeMutex sync.RWMutex + batchSizeArgsForCall []struct { + } + batchSizeReturns struct { + result1 *orderer.BatchSize + } + batchSizeReturnsOnCall map[int]struct { + result1 *orderer.BatchSize + } + BatchTimeoutStub func() time.Duration + batchTimeoutMutex sync.RWMutex + batchTimeoutArgsForCall []struct { + } + batchTimeoutReturns struct { + result1 time.Duration + } + batchTimeoutReturnsOnCall map[int]struct { + result1 time.Duration + } + CapabilitiesStub func() channelconfig.OrdererCapabilities + capabilitiesMutex sync.RWMutex + capabilitiesArgsForCall []struct { + } + capabilitiesReturns struct { + result1 channelconfig.OrdererCapabilities + } + capabilitiesReturnsOnCall map[int]struct { + result1 channelconfig.OrdererCapabilities + } + ConsensusMetadataStub func() []byte + consensusMetadataMutex sync.RWMutex + consensusMetadataArgsForCall []struct { + } + consensusMetadataReturns struct { + result1 []byte + } + consensusMetadataReturnsOnCall map[int]struct { + result1 []byte + } + ConsensusStateStub func() orderer.ConsensusType_State + consensusStateMutex sync.RWMutex + consensusStateArgsForCall []struct { + } + consensusStateReturns struct { + result1 orderer.ConsensusType_State + } + consensusStateReturnsOnCall map[int]struct { + result1 orderer.ConsensusType_State + } + ConsensusTypeStub func() string + consensusTypeMutex sync.RWMutex + consensusTypeArgsForCall []struct { + } + consensusTypeReturns struct { + result1 string + } + consensusTypeReturnsOnCall map[int]struct { + result1 string + } + ConsentersStub func() []*common.Consenter + consentersMutex sync.RWMutex + consentersArgsForCall []struct { + } + consentersReturns struct { + result1 []*common.Consenter + } + consentersReturnsOnCall map[int]struct { + result1 []*common.Consenter + } + MaxChannelsCountStub func() uint64 + maxChannelsCountMutex sync.RWMutex + maxChannelsCountArgsForCall []struct { + } + maxChannelsCountReturns struct { + result1 uint64 + } + maxChannelsCountReturnsOnCall map[int]struct { + result1 uint64 + } + OrganizationsStub func() map[string]channelconfig.OrdererOrg + organizationsMutex sync.RWMutex + organizationsArgsForCall []struct { + } + organizationsReturns struct { + result1 map[string]channelconfig.OrdererOrg + } + organizationsReturnsOnCall map[int]struct { + result1 map[string]channelconfig.OrdererOrg + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *OrdererConfig) BatchSize() *orderer.BatchSize { + fake.batchSizeMutex.Lock() + ret, specificReturn := fake.batchSizeReturnsOnCall[len(fake.batchSizeArgsForCall)] + fake.batchSizeArgsForCall = append(fake.batchSizeArgsForCall, struct { + }{}) + stub := fake.BatchSizeStub + fakeReturns := fake.batchSizeReturns + fake.recordInvocation("BatchSize", []interface{}{}) + fake.batchSizeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) BatchSizeCallCount() int { + fake.batchSizeMutex.RLock() + defer fake.batchSizeMutex.RUnlock() + return len(fake.batchSizeArgsForCall) +} + +func (fake *OrdererConfig) BatchSizeCalls(stub func() *orderer.BatchSize) { + fake.batchSizeMutex.Lock() + defer fake.batchSizeMutex.Unlock() + fake.BatchSizeStub = stub +} + +func (fake *OrdererConfig) BatchSizeReturns(result1 *orderer.BatchSize) { + fake.batchSizeMutex.Lock() + defer fake.batchSizeMutex.Unlock() + fake.BatchSizeStub = nil + fake.batchSizeReturns = struct { + result1 *orderer.BatchSize + }{result1} +} + +func (fake *OrdererConfig) BatchSizeReturnsOnCall(i int, result1 *orderer.BatchSize) { + fake.batchSizeMutex.Lock() + defer fake.batchSizeMutex.Unlock() + fake.BatchSizeStub = nil + if fake.batchSizeReturnsOnCall == nil { + fake.batchSizeReturnsOnCall = make(map[int]struct { + result1 *orderer.BatchSize + }) + } + fake.batchSizeReturnsOnCall[i] = struct { + result1 *orderer.BatchSize + }{result1} +} + +func (fake *OrdererConfig) BatchTimeout() time.Duration { + fake.batchTimeoutMutex.Lock() + ret, specificReturn := fake.batchTimeoutReturnsOnCall[len(fake.batchTimeoutArgsForCall)] + fake.batchTimeoutArgsForCall = append(fake.batchTimeoutArgsForCall, struct { + }{}) + stub := fake.BatchTimeoutStub + fakeReturns := fake.batchTimeoutReturns + fake.recordInvocation("BatchTimeout", []interface{}{}) + fake.batchTimeoutMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) BatchTimeoutCallCount() int { + fake.batchTimeoutMutex.RLock() + defer fake.batchTimeoutMutex.RUnlock() + return len(fake.batchTimeoutArgsForCall) +} + +func (fake *OrdererConfig) BatchTimeoutCalls(stub func() time.Duration) { + fake.batchTimeoutMutex.Lock() + defer fake.batchTimeoutMutex.Unlock() + fake.BatchTimeoutStub = stub +} + +func (fake *OrdererConfig) BatchTimeoutReturns(result1 time.Duration) { + fake.batchTimeoutMutex.Lock() + defer fake.batchTimeoutMutex.Unlock() + fake.BatchTimeoutStub = nil + fake.batchTimeoutReturns = struct { + result1 time.Duration + }{result1} +} + +func (fake *OrdererConfig) BatchTimeoutReturnsOnCall(i int, result1 time.Duration) { + fake.batchTimeoutMutex.Lock() + defer fake.batchTimeoutMutex.Unlock() + fake.BatchTimeoutStub = nil + if fake.batchTimeoutReturnsOnCall == nil { + fake.batchTimeoutReturnsOnCall = make(map[int]struct { + result1 time.Duration + }) + } + fake.batchTimeoutReturnsOnCall[i] = struct { + result1 time.Duration + }{result1} +} + +func (fake *OrdererConfig) Capabilities() channelconfig.OrdererCapabilities { + fake.capabilitiesMutex.Lock() + ret, specificReturn := fake.capabilitiesReturnsOnCall[len(fake.capabilitiesArgsForCall)] + fake.capabilitiesArgsForCall = append(fake.capabilitiesArgsForCall, struct { + }{}) + stub := fake.CapabilitiesStub + fakeReturns := fake.capabilitiesReturns + fake.recordInvocation("Capabilities", []interface{}{}) + fake.capabilitiesMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) CapabilitiesCallCount() int { + fake.capabilitiesMutex.RLock() + defer fake.capabilitiesMutex.RUnlock() + return len(fake.capabilitiesArgsForCall) +} + +func (fake *OrdererConfig) CapabilitiesCalls(stub func() channelconfig.OrdererCapabilities) { + fake.capabilitiesMutex.Lock() + defer fake.capabilitiesMutex.Unlock() + fake.CapabilitiesStub = stub +} + +func (fake *OrdererConfig) CapabilitiesReturns(result1 channelconfig.OrdererCapabilities) { + fake.capabilitiesMutex.Lock() + defer fake.capabilitiesMutex.Unlock() + fake.CapabilitiesStub = nil + fake.capabilitiesReturns = struct { + result1 channelconfig.OrdererCapabilities + }{result1} +} + +func (fake *OrdererConfig) CapabilitiesReturnsOnCall(i int, result1 channelconfig.OrdererCapabilities) { + fake.capabilitiesMutex.Lock() + defer fake.capabilitiesMutex.Unlock() + fake.CapabilitiesStub = nil + if fake.capabilitiesReturnsOnCall == nil { + fake.capabilitiesReturnsOnCall = make(map[int]struct { + result1 channelconfig.OrdererCapabilities + }) + } + fake.capabilitiesReturnsOnCall[i] = struct { + result1 channelconfig.OrdererCapabilities + }{result1} +} + +func (fake *OrdererConfig) ConsensusMetadata() []byte { + fake.consensusMetadataMutex.Lock() + ret, specificReturn := fake.consensusMetadataReturnsOnCall[len(fake.consensusMetadataArgsForCall)] + fake.consensusMetadataArgsForCall = append(fake.consensusMetadataArgsForCall, struct { + }{}) + stub := fake.ConsensusMetadataStub + fakeReturns := fake.consensusMetadataReturns + fake.recordInvocation("ConsensusMetadata", []interface{}{}) + fake.consensusMetadataMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) ConsensusMetadataCallCount() int { + fake.consensusMetadataMutex.RLock() + defer fake.consensusMetadataMutex.RUnlock() + return len(fake.consensusMetadataArgsForCall) +} + +func (fake *OrdererConfig) ConsensusMetadataCalls(stub func() []byte) { + fake.consensusMetadataMutex.Lock() + defer fake.consensusMetadataMutex.Unlock() + fake.ConsensusMetadataStub = stub +} + +func (fake *OrdererConfig) ConsensusMetadataReturns(result1 []byte) { + fake.consensusMetadataMutex.Lock() + defer fake.consensusMetadataMutex.Unlock() + fake.ConsensusMetadataStub = nil + fake.consensusMetadataReturns = struct { + result1 []byte + }{result1} +} + +func (fake *OrdererConfig) ConsensusMetadataReturnsOnCall(i int, result1 []byte) { + fake.consensusMetadataMutex.Lock() + defer fake.consensusMetadataMutex.Unlock() + fake.ConsensusMetadataStub = nil + if fake.consensusMetadataReturnsOnCall == nil { + fake.consensusMetadataReturnsOnCall = make(map[int]struct { + result1 []byte + }) + } + fake.consensusMetadataReturnsOnCall[i] = struct { + result1 []byte + }{result1} +} + +func (fake *OrdererConfig) ConsensusState() orderer.ConsensusType_State { + fake.consensusStateMutex.Lock() + ret, specificReturn := fake.consensusStateReturnsOnCall[len(fake.consensusStateArgsForCall)] + fake.consensusStateArgsForCall = append(fake.consensusStateArgsForCall, struct { + }{}) + stub := fake.ConsensusStateStub + fakeReturns := fake.consensusStateReturns + fake.recordInvocation("ConsensusState", []interface{}{}) + fake.consensusStateMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) ConsensusStateCallCount() int { + fake.consensusStateMutex.RLock() + defer fake.consensusStateMutex.RUnlock() + return len(fake.consensusStateArgsForCall) +} + +func (fake *OrdererConfig) ConsensusStateCalls(stub func() orderer.ConsensusType_State) { + fake.consensusStateMutex.Lock() + defer fake.consensusStateMutex.Unlock() + fake.ConsensusStateStub = stub +} + +func (fake *OrdererConfig) ConsensusStateReturns(result1 orderer.ConsensusType_State) { + fake.consensusStateMutex.Lock() + defer fake.consensusStateMutex.Unlock() + fake.ConsensusStateStub = nil + fake.consensusStateReturns = struct { + result1 orderer.ConsensusType_State + }{result1} +} + +func (fake *OrdererConfig) ConsensusStateReturnsOnCall(i int, result1 orderer.ConsensusType_State) { + fake.consensusStateMutex.Lock() + defer fake.consensusStateMutex.Unlock() + fake.ConsensusStateStub = nil + if fake.consensusStateReturnsOnCall == nil { + fake.consensusStateReturnsOnCall = make(map[int]struct { + result1 orderer.ConsensusType_State + }) + } + fake.consensusStateReturnsOnCall[i] = struct { + result1 orderer.ConsensusType_State + }{result1} +} + +func (fake *OrdererConfig) ConsensusType() string { + fake.consensusTypeMutex.Lock() + ret, specificReturn := fake.consensusTypeReturnsOnCall[len(fake.consensusTypeArgsForCall)] + fake.consensusTypeArgsForCall = append(fake.consensusTypeArgsForCall, struct { + }{}) + stub := fake.ConsensusTypeStub + fakeReturns := fake.consensusTypeReturns + fake.recordInvocation("ConsensusType", []interface{}{}) + fake.consensusTypeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) ConsensusTypeCallCount() int { + fake.consensusTypeMutex.RLock() + defer fake.consensusTypeMutex.RUnlock() + return len(fake.consensusTypeArgsForCall) +} + +func (fake *OrdererConfig) ConsensusTypeCalls(stub func() string) { + fake.consensusTypeMutex.Lock() + defer fake.consensusTypeMutex.Unlock() + fake.ConsensusTypeStub = stub +} + +func (fake *OrdererConfig) ConsensusTypeReturns(result1 string) { + fake.consensusTypeMutex.Lock() + defer fake.consensusTypeMutex.Unlock() + fake.ConsensusTypeStub = nil + fake.consensusTypeReturns = struct { + result1 string + }{result1} +} + +func (fake *OrdererConfig) ConsensusTypeReturnsOnCall(i int, result1 string) { + fake.consensusTypeMutex.Lock() + defer fake.consensusTypeMutex.Unlock() + fake.ConsensusTypeStub = nil + if fake.consensusTypeReturnsOnCall == nil { + fake.consensusTypeReturnsOnCall = make(map[int]struct { + result1 string + }) + } + fake.consensusTypeReturnsOnCall[i] = struct { + result1 string + }{result1} +} + +func (fake *OrdererConfig) Consenters() []*common.Consenter { + fake.consentersMutex.Lock() + ret, specificReturn := fake.consentersReturnsOnCall[len(fake.consentersArgsForCall)] + fake.consentersArgsForCall = append(fake.consentersArgsForCall, struct { + }{}) + stub := fake.ConsentersStub + fakeReturns := fake.consentersReturns + fake.recordInvocation("Consenters", []interface{}{}) + fake.consentersMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) ConsentersCallCount() int { + fake.consentersMutex.RLock() + defer fake.consentersMutex.RUnlock() + return len(fake.consentersArgsForCall) +} + +func (fake *OrdererConfig) ConsentersCalls(stub func() []*common.Consenter) { + fake.consentersMutex.Lock() + defer fake.consentersMutex.Unlock() + fake.ConsentersStub = stub +} + +func (fake *OrdererConfig) ConsentersReturns(result1 []*common.Consenter) { + fake.consentersMutex.Lock() + defer fake.consentersMutex.Unlock() + fake.ConsentersStub = nil + fake.consentersReturns = struct { + result1 []*common.Consenter + }{result1} +} + +func (fake *OrdererConfig) ConsentersReturnsOnCall(i int, result1 []*common.Consenter) { + fake.consentersMutex.Lock() + defer fake.consentersMutex.Unlock() + fake.ConsentersStub = nil + if fake.consentersReturnsOnCall == nil { + fake.consentersReturnsOnCall = make(map[int]struct { + result1 []*common.Consenter + }) + } + fake.consentersReturnsOnCall[i] = struct { + result1 []*common.Consenter + }{result1} +} + +func (fake *OrdererConfig) MaxChannelsCount() uint64 { + fake.maxChannelsCountMutex.Lock() + ret, specificReturn := fake.maxChannelsCountReturnsOnCall[len(fake.maxChannelsCountArgsForCall)] + fake.maxChannelsCountArgsForCall = append(fake.maxChannelsCountArgsForCall, struct { + }{}) + stub := fake.MaxChannelsCountStub + fakeReturns := fake.maxChannelsCountReturns + fake.recordInvocation("MaxChannelsCount", []interface{}{}) + fake.maxChannelsCountMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) MaxChannelsCountCallCount() int { + fake.maxChannelsCountMutex.RLock() + defer fake.maxChannelsCountMutex.RUnlock() + return len(fake.maxChannelsCountArgsForCall) +} + +func (fake *OrdererConfig) MaxChannelsCountCalls(stub func() uint64) { + fake.maxChannelsCountMutex.Lock() + defer fake.maxChannelsCountMutex.Unlock() + fake.MaxChannelsCountStub = stub +} + +func (fake *OrdererConfig) MaxChannelsCountReturns(result1 uint64) { + fake.maxChannelsCountMutex.Lock() + defer fake.maxChannelsCountMutex.Unlock() + fake.MaxChannelsCountStub = nil + fake.maxChannelsCountReturns = struct { + result1 uint64 + }{result1} +} + +func (fake *OrdererConfig) MaxChannelsCountReturnsOnCall(i int, result1 uint64) { + fake.maxChannelsCountMutex.Lock() + defer fake.maxChannelsCountMutex.Unlock() + fake.MaxChannelsCountStub = nil + if fake.maxChannelsCountReturnsOnCall == nil { + fake.maxChannelsCountReturnsOnCall = make(map[int]struct { + result1 uint64 + }) + } + fake.maxChannelsCountReturnsOnCall[i] = struct { + result1 uint64 + }{result1} +} + +func (fake *OrdererConfig) Organizations() map[string]channelconfig.OrdererOrg { + fake.organizationsMutex.Lock() + ret, specificReturn := fake.organizationsReturnsOnCall[len(fake.organizationsArgsForCall)] + fake.organizationsArgsForCall = append(fake.organizationsArgsForCall, struct { + }{}) + stub := fake.OrganizationsStub + fakeReturns := fake.organizationsReturns + fake.recordInvocation("Organizations", []interface{}{}) + fake.organizationsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *OrdererConfig) OrganizationsCallCount() int { + fake.organizationsMutex.RLock() + defer fake.organizationsMutex.RUnlock() + return len(fake.organizationsArgsForCall) +} + +func (fake *OrdererConfig) OrganizationsCalls(stub func() map[string]channelconfig.OrdererOrg) { + fake.organizationsMutex.Lock() + defer fake.organizationsMutex.Unlock() + fake.OrganizationsStub = stub +} + +func (fake *OrdererConfig) OrganizationsReturns(result1 map[string]channelconfig.OrdererOrg) { + fake.organizationsMutex.Lock() + defer fake.organizationsMutex.Unlock() + fake.OrganizationsStub = nil + fake.organizationsReturns = struct { + result1 map[string]channelconfig.OrdererOrg + }{result1} +} + +func (fake *OrdererConfig) OrganizationsReturnsOnCall(i int, result1 map[string]channelconfig.OrdererOrg) { + fake.organizationsMutex.Lock() + defer fake.organizationsMutex.Unlock() + fake.OrganizationsStub = nil + if fake.organizationsReturnsOnCall == nil { + fake.organizationsReturnsOnCall = make(map[int]struct { + result1 map[string]channelconfig.OrdererOrg + }) + } + fake.organizationsReturnsOnCall[i] = struct { + result1 map[string]channelconfig.OrdererOrg + }{result1} +} + +func (fake *OrdererConfig) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.batchSizeMutex.RLock() + defer fake.batchSizeMutex.RUnlock() + fake.batchTimeoutMutex.RLock() + defer fake.batchTimeoutMutex.RUnlock() + fake.capabilitiesMutex.RLock() + defer fake.capabilitiesMutex.RUnlock() + fake.consensusMetadataMutex.RLock() + defer fake.consensusMetadataMutex.RUnlock() + fake.consensusStateMutex.RLock() + defer fake.consensusStateMutex.RUnlock() + fake.consensusTypeMutex.RLock() + defer fake.consensusTypeMutex.RUnlock() + fake.consentersMutex.RLock() + defer fake.consentersMutex.RUnlock() + fake.maxChannelsCountMutex.RLock() + defer fake.maxChannelsCountMutex.RUnlock() + fake.organizationsMutex.RLock() + defer fake.organizationsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *OrdererConfig) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/consensus/smartbft/mocks/updatable_block_verifier.go b/orderer/consensus/smartbft/mocks/updatable_block_verifier.go new file mode 100644 index 00000000000..df4ef59c550 --- /dev/null +++ b/orderer/consensus/smartbft/mocks/updatable_block_verifier.go @@ -0,0 +1,362 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/deliverclient" +) + +type UpdatableBlockVerifier struct { + CloneStub func() deliverclient.CloneableUpdatableBlockVerifier + cloneMutex sync.RWMutex + cloneArgsForCall []struct { + } + cloneReturns struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + } + cloneReturnsOnCall map[int]struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + } + UpdateBlockHeaderStub func(*common.Block) + updateBlockHeaderMutex sync.RWMutex + updateBlockHeaderArgsForCall []struct { + arg1 *common.Block + } + UpdateConfigStub func(*common.Block) error + updateConfigMutex sync.RWMutex + updateConfigArgsForCall []struct { + arg1 *common.Block + } + updateConfigReturns struct { + result1 error + } + updateConfigReturnsOnCall map[int]struct { + result1 error + } + VerifyBlockStub func(*common.Block) error + verifyBlockMutex sync.RWMutex + verifyBlockArgsForCall []struct { + arg1 *common.Block + } + verifyBlockReturns struct { + result1 error + } + verifyBlockReturnsOnCall map[int]struct { + result1 error + } + VerifyBlockAttestationStub func(*common.Block) error + verifyBlockAttestationMutex sync.RWMutex + verifyBlockAttestationArgsForCall []struct { + arg1 *common.Block + } + verifyBlockAttestationReturns struct { + result1 error + } + verifyBlockAttestationReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *UpdatableBlockVerifier) Clone() deliverclient.CloneableUpdatableBlockVerifier { + fake.cloneMutex.Lock() + ret, specificReturn := fake.cloneReturnsOnCall[len(fake.cloneArgsForCall)] + fake.cloneArgsForCall = append(fake.cloneArgsForCall, struct { + }{}) + stub := fake.CloneStub + fakeReturns := fake.cloneReturns + fake.recordInvocation("Clone", []interface{}{}) + fake.cloneMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *UpdatableBlockVerifier) CloneCallCount() int { + fake.cloneMutex.RLock() + defer fake.cloneMutex.RUnlock() + return len(fake.cloneArgsForCall) +} + +func (fake *UpdatableBlockVerifier) CloneCalls(stub func() deliverclient.CloneableUpdatableBlockVerifier) { + fake.cloneMutex.Lock() + defer fake.cloneMutex.Unlock() + fake.CloneStub = stub +} + +func (fake *UpdatableBlockVerifier) CloneReturns(result1 deliverclient.CloneableUpdatableBlockVerifier) { + fake.cloneMutex.Lock() + defer fake.cloneMutex.Unlock() + fake.CloneStub = nil + fake.cloneReturns = struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + }{result1} +} + +func (fake *UpdatableBlockVerifier) CloneReturnsOnCall(i int, result1 deliverclient.CloneableUpdatableBlockVerifier) { + fake.cloneMutex.Lock() + defer fake.cloneMutex.Unlock() + fake.CloneStub = nil + if fake.cloneReturnsOnCall == nil { + fake.cloneReturnsOnCall = make(map[int]struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + }) + } + fake.cloneReturnsOnCall[i] = struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + }{result1} +} + +func (fake *UpdatableBlockVerifier) UpdateBlockHeader(arg1 *common.Block) { + fake.updateBlockHeaderMutex.Lock() + fake.updateBlockHeaderArgsForCall = append(fake.updateBlockHeaderArgsForCall, struct { + arg1 *common.Block + }{arg1}) + stub := fake.UpdateBlockHeaderStub + fake.recordInvocation("UpdateBlockHeader", []interface{}{arg1}) + fake.updateBlockHeaderMutex.Unlock() + if stub != nil { + fake.UpdateBlockHeaderStub(arg1) + } +} + +func (fake *UpdatableBlockVerifier) UpdateBlockHeaderCallCount() int { + fake.updateBlockHeaderMutex.RLock() + defer fake.updateBlockHeaderMutex.RUnlock() + return len(fake.updateBlockHeaderArgsForCall) +} + +func (fake *UpdatableBlockVerifier) UpdateBlockHeaderCalls(stub func(*common.Block)) { + fake.updateBlockHeaderMutex.Lock() + defer fake.updateBlockHeaderMutex.Unlock() + fake.UpdateBlockHeaderStub = stub +} + +func (fake *UpdatableBlockVerifier) UpdateBlockHeaderArgsForCall(i int) *common.Block { + fake.updateBlockHeaderMutex.RLock() + defer fake.updateBlockHeaderMutex.RUnlock() + argsForCall := fake.updateBlockHeaderArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *UpdatableBlockVerifier) UpdateConfig(arg1 *common.Block) error { + fake.updateConfigMutex.Lock() + ret, specificReturn := fake.updateConfigReturnsOnCall[len(fake.updateConfigArgsForCall)] + fake.updateConfigArgsForCall = append(fake.updateConfigArgsForCall, struct { + arg1 *common.Block + }{arg1}) + stub := fake.UpdateConfigStub + fakeReturns := fake.updateConfigReturns + fake.recordInvocation("UpdateConfig", []interface{}{arg1}) + fake.updateConfigMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *UpdatableBlockVerifier) UpdateConfigCallCount() int { + fake.updateConfigMutex.RLock() + defer fake.updateConfigMutex.RUnlock() + return len(fake.updateConfigArgsForCall) +} + +func (fake *UpdatableBlockVerifier) UpdateConfigCalls(stub func(*common.Block) error) { + fake.updateConfigMutex.Lock() + defer fake.updateConfigMutex.Unlock() + fake.UpdateConfigStub = stub +} + +func (fake *UpdatableBlockVerifier) UpdateConfigArgsForCall(i int) *common.Block { + fake.updateConfigMutex.RLock() + defer fake.updateConfigMutex.RUnlock() + argsForCall := fake.updateConfigArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *UpdatableBlockVerifier) UpdateConfigReturns(result1 error) { + fake.updateConfigMutex.Lock() + defer fake.updateConfigMutex.Unlock() + fake.UpdateConfigStub = nil + fake.updateConfigReturns = struct { + result1 error + }{result1} +} + +func (fake *UpdatableBlockVerifier) UpdateConfigReturnsOnCall(i int, result1 error) { + fake.updateConfigMutex.Lock() + defer fake.updateConfigMutex.Unlock() + fake.UpdateConfigStub = nil + if fake.updateConfigReturnsOnCall == nil { + fake.updateConfigReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateConfigReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *UpdatableBlockVerifier) VerifyBlock(arg1 *common.Block) error { + fake.verifyBlockMutex.Lock() + ret, specificReturn := fake.verifyBlockReturnsOnCall[len(fake.verifyBlockArgsForCall)] + fake.verifyBlockArgsForCall = append(fake.verifyBlockArgsForCall, struct { + arg1 *common.Block + }{arg1}) + stub := fake.VerifyBlockStub + fakeReturns := fake.verifyBlockReturns + fake.recordInvocation("VerifyBlock", []interface{}{arg1}) + fake.verifyBlockMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *UpdatableBlockVerifier) VerifyBlockCallCount() int { + fake.verifyBlockMutex.RLock() + defer fake.verifyBlockMutex.RUnlock() + return len(fake.verifyBlockArgsForCall) +} + +func (fake *UpdatableBlockVerifier) VerifyBlockCalls(stub func(*common.Block) error) { + fake.verifyBlockMutex.Lock() + defer fake.verifyBlockMutex.Unlock() + fake.VerifyBlockStub = stub +} + +func (fake *UpdatableBlockVerifier) VerifyBlockArgsForCall(i int) *common.Block { + fake.verifyBlockMutex.RLock() + defer fake.verifyBlockMutex.RUnlock() + argsForCall := fake.verifyBlockArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *UpdatableBlockVerifier) VerifyBlockReturns(result1 error) { + fake.verifyBlockMutex.Lock() + defer fake.verifyBlockMutex.Unlock() + fake.VerifyBlockStub = nil + fake.verifyBlockReturns = struct { + result1 error + }{result1} +} + +func (fake *UpdatableBlockVerifier) VerifyBlockReturnsOnCall(i int, result1 error) { + fake.verifyBlockMutex.Lock() + defer fake.verifyBlockMutex.Unlock() + fake.VerifyBlockStub = nil + if fake.verifyBlockReturnsOnCall == nil { + fake.verifyBlockReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.verifyBlockReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *UpdatableBlockVerifier) VerifyBlockAttestation(arg1 *common.Block) error { + fake.verifyBlockAttestationMutex.Lock() + ret, specificReturn := fake.verifyBlockAttestationReturnsOnCall[len(fake.verifyBlockAttestationArgsForCall)] + fake.verifyBlockAttestationArgsForCall = append(fake.verifyBlockAttestationArgsForCall, struct { + arg1 *common.Block + }{arg1}) + stub := fake.VerifyBlockAttestationStub + fakeReturns := fake.verifyBlockAttestationReturns + fake.recordInvocation("VerifyBlockAttestation", []interface{}{arg1}) + fake.verifyBlockAttestationMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *UpdatableBlockVerifier) VerifyBlockAttestationCallCount() int { + fake.verifyBlockAttestationMutex.RLock() + defer fake.verifyBlockAttestationMutex.RUnlock() + return len(fake.verifyBlockAttestationArgsForCall) +} + +func (fake *UpdatableBlockVerifier) VerifyBlockAttestationCalls(stub func(*common.Block) error) { + fake.verifyBlockAttestationMutex.Lock() + defer fake.verifyBlockAttestationMutex.Unlock() + fake.VerifyBlockAttestationStub = stub +} + +func (fake *UpdatableBlockVerifier) VerifyBlockAttestationArgsForCall(i int) *common.Block { + fake.verifyBlockAttestationMutex.RLock() + defer fake.verifyBlockAttestationMutex.RUnlock() + argsForCall := fake.verifyBlockAttestationArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *UpdatableBlockVerifier) VerifyBlockAttestationReturns(result1 error) { + fake.verifyBlockAttestationMutex.Lock() + defer fake.verifyBlockAttestationMutex.Unlock() + fake.VerifyBlockAttestationStub = nil + fake.verifyBlockAttestationReturns = struct { + result1 error + }{result1} +} + +func (fake *UpdatableBlockVerifier) VerifyBlockAttestationReturnsOnCall(i int, result1 error) { + fake.verifyBlockAttestationMutex.Lock() + defer fake.verifyBlockAttestationMutex.Unlock() + fake.VerifyBlockAttestationStub = nil + if fake.verifyBlockAttestationReturnsOnCall == nil { + fake.verifyBlockAttestationReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.verifyBlockAttestationReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *UpdatableBlockVerifier) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.cloneMutex.RLock() + defer fake.cloneMutex.RUnlock() + fake.updateBlockHeaderMutex.RLock() + defer fake.updateBlockHeaderMutex.RUnlock() + fake.updateConfigMutex.RLock() + defer fake.updateConfigMutex.RUnlock() + fake.verifyBlockMutex.RLock() + defer fake.verifyBlockMutex.RUnlock() + fake.verifyBlockAttestationMutex.RLock() + defer fake.verifyBlockAttestationMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *UpdatableBlockVerifier) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/orderer/consensus/smartbft/mocks/verifier_factory.go b/orderer/consensus/smartbft/mocks/verifier_factory.go new file mode 100644 index 00000000000..74e63057689 --- /dev/null +++ b/orderer/consensus/smartbft/mocks/verifier_factory.go @@ -0,0 +1,126 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric-lib-go/bccsp" + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/deliverclient" + "github.com/hyperledger/fabric/orderer/consensus/smartbft" +) + +type VerifierFactory struct { + CreateBlockVerifierStub func(*common.Block, *common.Block, bccsp.BCCSP, *flogging.FabricLogger) (deliverclient.CloneableUpdatableBlockVerifier, error) + createBlockVerifierMutex sync.RWMutex + createBlockVerifierArgsForCall []struct { + arg1 *common.Block + arg2 *common.Block + arg3 bccsp.BCCSP + arg4 *flogging.FabricLogger + } + createBlockVerifierReturns struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + result2 error + } + createBlockVerifierReturnsOnCall map[int]struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *VerifierFactory) CreateBlockVerifier(arg1 *common.Block, arg2 *common.Block, arg3 bccsp.BCCSP, arg4 *flogging.FabricLogger) (deliverclient.CloneableUpdatableBlockVerifier, error) { + fake.createBlockVerifierMutex.Lock() + ret, specificReturn := fake.createBlockVerifierReturnsOnCall[len(fake.createBlockVerifierArgsForCall)] + fake.createBlockVerifierArgsForCall = append(fake.createBlockVerifierArgsForCall, struct { + arg1 *common.Block + arg2 *common.Block + arg3 bccsp.BCCSP + arg4 *flogging.FabricLogger + }{arg1, arg2, arg3, arg4}) + stub := fake.CreateBlockVerifierStub + fakeReturns := fake.createBlockVerifierReturns + fake.recordInvocation("CreateBlockVerifier", []interface{}{arg1, arg2, arg3, arg4}) + fake.createBlockVerifierMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *VerifierFactory) CreateBlockVerifierCallCount() int { + fake.createBlockVerifierMutex.RLock() + defer fake.createBlockVerifierMutex.RUnlock() + return len(fake.createBlockVerifierArgsForCall) +} + +func (fake *VerifierFactory) CreateBlockVerifierCalls(stub func(*common.Block, *common.Block, bccsp.BCCSP, *flogging.FabricLogger) (deliverclient.CloneableUpdatableBlockVerifier, error)) { + fake.createBlockVerifierMutex.Lock() + defer fake.createBlockVerifierMutex.Unlock() + fake.CreateBlockVerifierStub = stub +} + +func (fake *VerifierFactory) CreateBlockVerifierArgsForCall(i int) (*common.Block, *common.Block, bccsp.BCCSP, *flogging.FabricLogger) { + fake.createBlockVerifierMutex.RLock() + defer fake.createBlockVerifierMutex.RUnlock() + argsForCall := fake.createBlockVerifierArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *VerifierFactory) CreateBlockVerifierReturns(result1 deliverclient.CloneableUpdatableBlockVerifier, result2 error) { + fake.createBlockVerifierMutex.Lock() + defer fake.createBlockVerifierMutex.Unlock() + fake.CreateBlockVerifierStub = nil + fake.createBlockVerifierReturns = struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + result2 error + }{result1, result2} +} + +func (fake *VerifierFactory) CreateBlockVerifierReturnsOnCall(i int, result1 deliverclient.CloneableUpdatableBlockVerifier, result2 error) { + fake.createBlockVerifierMutex.Lock() + defer fake.createBlockVerifierMutex.Unlock() + fake.CreateBlockVerifierStub = nil + if fake.createBlockVerifierReturnsOnCall == nil { + fake.createBlockVerifierReturnsOnCall = make(map[int]struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + result2 error + }) + } + fake.createBlockVerifierReturnsOnCall[i] = struct { + result1 deliverclient.CloneableUpdatableBlockVerifier + result2 error + }{result1, result2} +} + +func (fake *VerifierFactory) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.createBlockVerifierMutex.RLock() + defer fake.createBlockVerifierMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *VerifierFactory) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ smartbft.VerifierFactory = new(VerifierFactory) diff --git a/orderer/consensus/smartbft/sync_buffer.go b/orderer/consensus/smartbft/sync_buffer.go new file mode 100644 index 00000000000..a7d6ff3d7a8 --- /dev/null +++ b/orderer/consensus/smartbft/sync_buffer.go @@ -0,0 +1,70 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/pkg/errors" +) + +type SyncBuffer struct { + blockCh chan *common.Block + stopCh chan struct{} + stopOnce sync.Once +} + +func NewSyncBuffer() *SyncBuffer { + return &SyncBuffer{ + blockCh: make(chan *common.Block, 10), + stopCh: make(chan struct{}), + } +} + +// HandleBlock gives the block to the next stage of processing after fetching it from a remote orderer. +func (sb *SyncBuffer) HandleBlock(channelID string, block *common.Block) error { + if block == nil || block.Header == nil { + return errors.Errorf("empty block or block header, channel: %s", channelID) + } + + select { + case sb.blockCh <- block: + return nil + case <-sb.stopCh: + return errors.Errorf("SyncBuffer stopping, channel: %s", channelID) + } +} + +func (sb *SyncBuffer) PullBlock(seq uint64) *common.Block { + var block *common.Block + for { + select { + case block = <-sb.blockCh: + if block == nil || block.Header == nil { + return nil + } + if block.GetHeader().GetNumber() == seq { + return block + } + if block.GetHeader().GetNumber() < seq { + continue + } + if block.GetHeader().GetNumber() > seq { + return nil + } + case <-sb.stopCh: + return nil + } + } +} + +func (sb *SyncBuffer) Stop() { + sb.stopOnce.Do(func() { + close(sb.stopCh) + }) +} diff --git a/orderer/consensus/smartbft/sync_buffer_test.go b/orderer/consensus/smartbft/sync_buffer_test.go new file mode 100644 index 00000000000..9c8d8695dbd --- /dev/null +++ b/orderer/consensus/smartbft/sync_buffer_test.go @@ -0,0 +1,186 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft_test + +import ( + "reflect" + "sync" + "testing" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/orderer/consensus/smartbft" + "github.com/stretchr/testify/require" +) + +func TestNewSyncBuffer(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) +} + +func TestSyncBuffer_PullBlock(t *testing.T) { + t.Run("blocks until stopped", func(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + block := buff.PullBlock(1) + require.Nil(t, block) + wg.Done() + }() + + buff.Stop() + wg.Wait() + }) + + t.Run("blocks until HandleBlock is called", func(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) + + blockIn := &common.Block{ + Header: &common.BlockHeader{Number: 2, PreviousHash: []byte{1, 2, 3, 4}, DataHash: []byte{5, 6, 7, 8}}, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + blockOut := buff.PullBlock(2) + require.NotNil(t, blockOut) + require.True(t, reflect.DeepEqual(blockIn, blockOut)) + wg.Done() + }() + + err := buff.HandleBlock("mychannel", blockIn) + require.NoError(t, err) + wg.Wait() + }) + + t.Run("block number mismatch, request number lower than head, return nil", func(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) + + blockIn := &common.Block{ + Header: &common.BlockHeader{Number: 2, PreviousHash: []byte{1, 2, 3, 4}, DataHash: []byte{5, 6, 7, 8}}, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + blockOut := buff.PullBlock(1) + require.Nil(t, blockOut) + wg.Done() + }() + + err := buff.HandleBlock("mychannel", blockIn) + require.NoError(t, err) + wg.Wait() + }) + + t.Run("block number mismatch, requested number higher than head, blocks until inserted", func(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) + + blockIn2 := &common.Block{ + Header: &common.BlockHeader{Number: 2, PreviousHash: []byte{1, 2, 3, 4}, DataHash: []byte{5, 6, 7, 8}}, + } + blockIn3 := &common.Block{ + Header: &common.BlockHeader{Number: 3, PreviousHash: []byte{9, 10, 11, 12}, DataHash: []byte{13, 14, 15, 16}}, + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + blockOut := buff.PullBlock(3) + require.NotNil(t, blockOut) + require.True(t, reflect.DeepEqual(blockIn3, blockOut)) + wg.Done() + }() + + err := buff.HandleBlock("mychannel", blockIn2) + require.NoError(t, err) + err = buff.HandleBlock("mychannel", blockIn3) + require.NoError(t, err) + + wg.Wait() + }) + + t.Run("continuous operation", func(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) + + var wg sync.WaitGroup + wg.Add(1) + + firstBlock := uint64(10) + lastBlock := uint64(1000) + go func() { + j := firstBlock + for { + blockOut := buff.PullBlock(j) + require.NotNil(t, blockOut) + require.Equal(t, j, blockOut.Header.Number) + j++ + if j == lastBlock { + break + } + } + wg.Done() + }() + + for i := firstBlock; i <= lastBlock; i++ { + blockIn := &common.Block{ + Header: &common.BlockHeader{Number: i, PreviousHash: []byte{1, 2, 3, 4}, DataHash: []byte{5, 6, 7, 8}}, + } + err := buff.HandleBlock("mychannel", blockIn) + require.NoError(t, err) + } + + wg.Wait() + }) +} + +func TestSyncBuffer_HandleBlock(t *testing.T) { + t.Run("blocks until stopped", func(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + var number uint64 = 1 + var err error + for { + blockIn := &common.Block{ + Header: &common.BlockHeader{Number: number, PreviousHash: []byte{1, 2, 3, 4}, DataHash: []byte{5, 6, 7, 8}}, + } + err = buff.HandleBlock("mychannel", blockIn) + if err != nil { + break + } + number++ + } + + require.EqualError(t, err, "SyncBuffer stopping, channel: mychannel") + wg.Done() + }() + + buff.Stop() + wg.Wait() + }) + + t.Run("bad blocks", func(t *testing.T) { + buff := smartbft.NewSyncBuffer() + require.NotNil(t, buff) + + err := buff.HandleBlock("mychannel", nil) + require.EqualError(t, err, "empty block or block header, channel: mychannel") + + err = buff.HandleBlock("mychannel", &common.Block{}) + require.EqualError(t, err, "empty block or block header, channel: mychannel") + }) +} diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go new file mode 100644 index 00000000000..85e9de48e3c --- /dev/null +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -0,0 +1,271 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft + +import ( + "sort" + "sync" + "time" + + "github.com/SmartBFT-Go/consensus/pkg/types" + "github.com/hyperledger/fabric-lib-go/bccsp" + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/deliverclient" + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" + "github.com/hyperledger/fabric/internal/pkg/peer/orderers" + "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/common/localconfig" + "github.com/hyperledger/fabric/orderer/consensus" + "github.com/hyperledger/fabric/protoutil" + "github.com/pkg/errors" +) + +type BFTSynchronizer struct { + lastReconfig types.Reconfig + selfID uint64 + LatestConfig func() (types.Configuration, []uint64) + BlockToDecision func(*common.Block) *types.Decision + OnCommit func(*common.Block) types.Reconfig + Support consensus.ConsenterSupport + CryptoProvider bccsp.BCCSP + ClusterDialer *cluster.PredicateDialer + LocalConfigCluster localconfig.Cluster + BlockPullerFactory BlockPullerFactory + VerifierFactory VerifierFactory + BFTDelivererFactory BFTDelivererFactory + Logger *flogging.FabricLogger + + mutex sync.Mutex + syncBuff *SyncBuffer +} + +func (s *BFTSynchronizer) Sync() types.SyncResponse { + decision, err := s.synchronize() + if err != nil { + s.Logger.Warnf("Could not synchronize with remote orderers due to %s, returning state from local ledger", err) + block := s.Support.Block(s.Support.Height() - 1) + config, nodes := s.LatestConfig() + return types.SyncResponse{ + Latest: *s.BlockToDecision(block), + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, // If we read from ledger we do not need to reconfigure. + CurrentNodes: nodes, + CurrentConfig: config, + }, + } + } + + // After sync has ended, reset the state of the last reconfig. + defer func() { + s.lastReconfig = types.Reconfig{} + }() + + s.Logger.Debugf("reconfig: %+v", s.lastReconfig) + return types.SyncResponse{ + Latest: *decision, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: s.lastReconfig.InLatestDecision, + CurrentConfig: s.lastReconfig.CurrentConfig, + CurrentNodes: s.lastReconfig.CurrentNodes, + }, + } +} + +// Buffer return the internal SyncBuffer for testability. +func (s *BFTSynchronizer) Buffer() *SyncBuffer { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.syncBuff +} + +func (s *BFTSynchronizer) synchronize() (*types.Decision, error) { + //=== We probe all the endpoints and establish a target height, as well as detect the self endpoint. + targetHeight, myEndpoint, err := s.detectTargetHeight() + if err != nil { + return nil, errors.Wrapf(err, "cannot get detect target height") + } + + startHeight := s.Support.Height() + if startHeight >= targetHeight { + return nil, errors.Errorf("already at target height of %d", targetHeight) + } + + //=== Create a buffer to accept the blocks delivered from the BFTDeliverer. + s.mutex.Lock() + s.syncBuff = NewSyncBuffer() + s.mutex.Unlock() + + //=== Create the BFT block deliverer and start a go-routine that fetches block and inserts them into the syncBuffer. + bftDeliverer, err := s.createBFTDeliverer(startHeight, myEndpoint) + if err != nil { + return nil, errors.Wrapf(err, "cannot create BFT block deliverer") + } + + go bftDeliverer.DeliverBlocks() + defer bftDeliverer.Stop() + + //=== Loop on sync-buffer and pull blocks, writing them to the ledger, returning the last block pulled. + lastPulledBlock, err := s.getBlocksFromSyncBuffer(startHeight, targetHeight) + if err != nil { + return nil, errors.Wrapf(err, "failed to get any blocks from SyncBuffer") + } + + decision := s.BlockToDecision(lastPulledBlock) + s.Logger.Infof("Returning decision from block [%d], decision: %+v", lastPulledBlock.GetHeader().GetNumber(), decision) + return decision, nil +} + +// detectTargetHeight probes remote endpoints and detects what is the target height this node needs to reach. It also +// detects the self-endpoint. +// +// In BFT it is highly recommended that the channel/orderer-endpoints (for delivery & broadcast) map 1:1 to the +// channel/orderers/consenters (for cluster consensus), that is, every consenter should be represented by a +// delivery endpoint. This important for Sync to work properly. +func (s *BFTSynchronizer) detectTargetHeight() (uint64, string, error) { + blockPuller, err := s.BlockPullerFactory.CreateBlockPuller(s.Support, s.ClusterDialer, s.LocalConfigCluster, s.CryptoProvider) + if err != nil { + return 0, "", errors.Wrap(err, "cannot get create BlockPuller") + } + defer blockPuller.Close() + + heightByEndpoint, myEndpoint, err := blockPuller.HeightsByEndpoints() + if err != nil { + return 0, "", errors.Wrap(err, "cannot get HeightsByEndpoints") + } + + s.Logger.Infof("HeightsByEndpoints: %+v, my endpoint: %s", heightByEndpoint, myEndpoint) + + delete(heightByEndpoint, myEndpoint) + var heights []uint64 + for _, value := range heightByEndpoint { + heights = append(heights, value) + } + + if len(heights) == 0 { + return 0, "", errors.New("no cluster members to synchronize with") + } + + targetHeight := s.computeTargetHeight(heights) + s.Logger.Infof("Detected target height: %d", targetHeight) + return targetHeight, myEndpoint, nil +} + +// computeTargetHeight compute the target height to synchronize to. +// +// heights: a slice containing the heights of accessible peers, length must be >0. +// clusterSize: the cluster size, must be >0. +func (s *BFTSynchronizer) computeTargetHeight(heights []uint64) uint64 { + sort.Slice(heights, func(i, j int) bool { return heights[i] > heights[j] }) // Descending + clusterSize := len(s.Support.SharedConfig().Consenters()) + f := uint64(clusterSize-1) / 3 // The number of tolerated byzantine faults + lenH := uint64(len(heights)) + + s.Logger.Debugf("Cluster size: %d, F: %d, Heights: %v", clusterSize, f, heights) + + if lenH < f+1 { + s.Logger.Debugf("Returning %d", heights[0]) + return heights[int(lenH)-1] + } + s.Logger.Debugf("Returning %d", heights[f]) + return heights[f] +} + +// createBFTDeliverer creates and initializes the BFT block deliverer. +func (s *BFTSynchronizer) createBFTDeliverer(startHeight uint64, myEndpoint string) (BFTBlockDeliverer, error) { + lastBlock := s.Support.Block(startHeight - 1) + lastConfigBlock, err := cluster.LastConfigBlock(lastBlock, s.Support) + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve last config block") + } + lastConfigEnv, err := deliverclient.ConfigFromBlock(lastConfigBlock) + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve last config envelope") + } + + var updatableVerifier deliverclient.CloneableUpdatableBlockVerifier + updatableVerifier, err = s.VerifierFactory.CreateBlockVerifier(lastConfigBlock, lastBlock, s.CryptoProvider, s.Logger) + if err != nil { + return nil, errors.Wrapf(err, "failed to create BlockVerificationAssistant") + } + + clientConfig := s.ClusterDialer.Config // The cluster and block puller use slightly different options + clientConfig.AsyncConnect = false + clientConfig.SecOpts.VerifyCertificate = nil + + bftDeliverer := s.BFTDelivererFactory.CreateBFTDeliverer( + s.Support.ChannelID(), + s.syncBuff, + &ledgerInfoAdapter{s.Support}, + updatableVerifier, + blocksprovider.DialerAdapter{ClientConfig: clientConfig}, + &orderers.ConnectionSourceFactory{}, // no overrides in the orderer + s.CryptoProvider, + make(chan struct{}), + s.Support, + blocksprovider.DeliverAdapter{}, + &blocksprovider.BFTCensorshipMonitorFactory{}, + flogging.MustGetLogger("orderer.blocksprovider").With("channel", s.Support.ChannelID()), + 10*time.Millisecond, // TODO get it from config. + 2*time.Second, // TODO get it from config. + 20*time.Second, // TODO get it from config. + time.Minute, // TODO get it from config. + func() (stopRetries bool) { + s.syncBuff.Stop() + return true // In the orderer we must limit the time we try to do Synch() + }, + ) + + s.Logger.Infof("Created a BFTDeliverer: %+v", bftDeliverer) + bftDeliverer.Initialize(lastConfigEnv.GetConfig(), myEndpoint) + + return bftDeliverer, nil +} + +func (s *BFTSynchronizer) getBlocksFromSyncBuffer(startHeight, targetHeight uint64) (*common.Block, error) { + targetSeq := targetHeight - 1 + seq := startHeight + var blocksFetched int + s.Logger.Debugf("Will fetch sequences [%d-%d]", seq, targetSeq) + + var lastPulledBlock *common.Block + for seq <= targetSeq { + block := s.syncBuff.PullBlock(seq) + if block == nil { + s.Logger.Debugf("Failed to fetch block [%d] from cluster", seq) + break + } + if protoutil.IsConfigBlock(block) { + s.Support.WriteConfigBlock(block, nil) + s.Logger.Debugf("Fetched and committed config block [%d] from cluster", seq) + } else { + s.Support.WriteBlock(block, nil) + s.Logger.Debugf("Fetched and committed block [%d] from cluster", seq) + } + lastPulledBlock = block + + prevInLatestDecision := s.lastReconfig.InLatestDecision + s.lastReconfig = s.OnCommit(lastPulledBlock) + s.lastReconfig.InLatestDecision = s.lastReconfig.InLatestDecision || prevInLatestDecision + s.Logger.Debugf("Last reconfig %+v", s.lastReconfig) + seq++ + blocksFetched++ + } + + s.syncBuff.Stop() + + if lastPulledBlock == nil { + return nil, errors.Errorf("failed pulling block %d", seq) + } + + startSeq := startHeight + s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]", + blocksFetched, startSeq, lastPulledBlock.Header.Number) + + return lastPulledBlock, nil +} diff --git a/orderer/consensus/smartbft/synchronizer_bft_test.go b/orderer/consensus/smartbft/synchronizer_bft_test.go new file mode 100644 index 00000000000..74a945fc0a8 --- /dev/null +++ b/orderer/consensus/smartbft/synchronizer_bft_test.go @@ -0,0 +1,529 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package smartbft_test + +import ( + "os" + "sync" + "testing" + + "github.com/SmartBFT-Go/consensus/pkg/types" + "github.com/SmartBFT-Go/consensus/smartbftprotos" + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-lib-go/common/flogging" + cb "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/channelconfig" + "github.com/hyperledger/fabric/common/deliverclient" + "github.com/hyperledger/fabric/internal/pkg/comm" + "github.com/hyperledger/fabric/orderer/common/cluster" + "github.com/hyperledger/fabric/orderer/common/localconfig" + mocks2 "github.com/hyperledger/fabric/orderer/consensus/mocks" + "github.com/hyperledger/fabric/orderer/consensus/smartbft" + "github.com/hyperledger/fabric/orderer/consensus/smartbft/mocks" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +//go:generate counterfeiter -o mocks/updatable_block_verifier.go --fake-name UpdatableBlockVerifier . updatableBlockVerifier +type updatableBlockVerifier interface { + deliverclient.CloneableUpdatableBlockVerifier +} + +//go:generate counterfeiter -o mocks/orderer_config.go --fake-name OrdererConfig . ordererConfig +type ordererConfig interface { + channelconfig.Orderer +} + +func TestBFTSynchronizer(t *testing.T) { + flogging.ActivateSpec("debug") + blockBytes, err := os.ReadFile("testdata/mychannel.block") + require.NoError(t, err) + + goodConfigBlock := &cb.Block{} + require.NoError(t, proto.Unmarshal(blockBytes, goodConfigBlock)) + + b42 := makeConfigBlockWithMetadata(goodConfigBlock, 42, &smartbftprotos.ViewMetadata{ViewId: 1, LatestSequence: 8}) + b99 := makeBlockWithMetadata(99, 42, &smartbftprotos.ViewMetadata{ViewId: 1, LatestSequence: 12}) + b100 := makeBlockWithMetadata(100, 42, &smartbftprotos.ViewMetadata{ViewId: 1, LatestSequence: 13}) + b101 := makeConfigBlockWithMetadata(goodConfigBlock, 101, &smartbftprotos.ViewMetadata{ViewId: 2, LatestSequence: 1}) + b102 := makeBlockWithMetadata(102, 101, &smartbftprotos.ViewMetadata{ViewId: 2, LatestSequence: 3}) + + blockNum2configSqn := map[uint64]uint64{ + 99: 7, + 100: 7, + 101: 8, + 102: 8, + } + + t.Run("no remote endpoints but myself", func(t *testing.T) { + bp := &mocks.FakeBlockPuller{} + bpf := &mocks.FakeBlockPullerFactory{} + bpf.CreateBlockPullerReturns(bp, nil) + + bp.HeightsByEndpointsReturns( + map[string]uint64{ + "example.com:1": 100, + }, + "example.com:1", + nil, + ) + + fakeCS := &mocks2.FakeConsenterSupport{} + fakeCS.HeightReturns(100) + fakeCS.BlockReturns(b99) + + decision := &types.SyncResponse{ + Latest: types.Decision{ + Proposal: types.Proposal{Header: []byte{1, 1, 1, 1}}, + Signatures: []types.Signature{{ID: 1}, {ID: 2}, {ID: 3}}, + }, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + }, + } + + bftSynchronizer := &smartbft.BFTSynchronizer{ + LatestConfig: func() (types.Configuration, []uint64) { + return types.Configuration{ + SelfID: 1, + }, []uint64{1, 2, 3, 4} + }, + BlockToDecision: func(block *cb.Block) *types.Decision { + if block == b99 { + return &decision.Latest + } + return nil + }, + OnCommit: noopUpdateLastHash, + Support: fakeCS, + LocalConfigCluster: localconfig.Cluster{}, + BlockPullerFactory: bpf, + Logger: flogging.MustGetLogger("test.smartbft"), + } + + require.NotNil(t, bftSynchronizer) + + resp := bftSynchronizer.Sync() + require.NotNil(t, resp) + require.Equal(t, *decision, resp) + }) + + t.Run("no remote endpoints", func(t *testing.T) { + bp := &mocks.FakeBlockPuller{} + bpf := &mocks.FakeBlockPullerFactory{} + bpf.CreateBlockPullerReturns(bp, nil) + + bp.HeightsByEndpointsReturns(map[string]uint64{}, "", nil) + + fakeCS := &mocks2.FakeConsenterSupport{} + fakeCS.HeightReturns(100) + fakeCS.BlockReturns(b99) + + decision := &types.SyncResponse{ + Latest: types.Decision{ + Proposal: types.Proposal{Header: []byte{1, 1, 1, 1}}, + Signatures: []types.Signature{{ID: 1}, {ID: 2}, {ID: 3}}, + }, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + }, + } + + bftSynchronizer := &smartbft.BFTSynchronizer{ + LatestConfig: func() (types.Configuration, []uint64) { + return types.Configuration{ + SelfID: 1, + }, []uint64{1, 2, 3, 4} + }, + BlockToDecision: func(block *cb.Block) *types.Decision { + if block == b99 { + return &decision.Latest + } + return nil + }, + OnCommit: noopUpdateLastHash, + Support: fakeCS, + LocalConfigCluster: localconfig.Cluster{}, + BlockPullerFactory: bpf, + Logger: flogging.MustGetLogger("test.smartbft"), + } + + require.NotNil(t, bftSynchronizer) + + resp := bftSynchronizer.Sync() + require.NotNil(t, resp) + require.Equal(t, *decision, resp) + }) + + t.Run("error creating block puller", func(t *testing.T) { + bpf := &mocks.FakeBlockPullerFactory{} + bpf.CreateBlockPullerReturns(nil, errors.New("oops")) + + fakeCS := &mocks2.FakeConsenterSupport{} + fakeCS.HeightReturns(100) + fakeCS.BlockReturns(b99) + + decision := &types.SyncResponse{ + Latest: types.Decision{ + Proposal: types.Proposal{Header: []byte{1, 1, 1, 1}}, + Signatures: []types.Signature{{ID: 1}, {ID: 2}, {ID: 3}}, + }, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + }, + } + + bftSynchronizer := &smartbft.BFTSynchronizer{ + LatestConfig: func() (types.Configuration, []uint64) { + return types.Configuration{ + SelfID: 1, + }, []uint64{1, 2, 3, 4} + }, + BlockToDecision: func(block *cb.Block) *types.Decision { + if block == b99 { + return &decision.Latest + } + return nil + }, + OnCommit: noopUpdateLastHash, + Support: fakeCS, + LocalConfigCluster: localconfig.Cluster{}, + BlockPullerFactory: bpf, + Logger: flogging.MustGetLogger("test.smartbft"), + } + + require.NotNil(t, bftSynchronizer) + resp := bftSynchronizer.Sync() + require.NotNil(t, resp) + require.Equal(t, *decision, resp) + }) + + t.Run("no remote endpoints above my height", func(t *testing.T) { + bp := &mocks.FakeBlockPuller{} + bpf := &mocks.FakeBlockPullerFactory{} + bpf.CreateBlockPullerReturns(bp, nil) + + bp.HeightsByEndpointsReturns( + map[string]uint64{ + "example.com:1": 100, + "example.com:2": 100, + }, + "example.com:1", + nil, + ) + + fakeCS := &mocks2.FakeConsenterSupport{} + fakeCS.HeightReturns(100) + fakeCS.BlockReturns(b99) + fakeOrdererConfig := &mocks.OrdererConfig{} + fakeOrdererConfig.ConsentersReturns([]*cb.Consenter{ + {Id: 1}, {Id: 2}, {Id: 3}, {Id: 4}, + }) + fakeCS.SharedConfigReturns(fakeOrdererConfig) + + decision := &types.SyncResponse{ + Latest: types.Decision{ + Proposal: types.Proposal{Header: []byte{1, 1, 1, 1}}, + Signatures: []types.Signature{{ID: 1}, {ID: 2}, {ID: 3}}, + }, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: false, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + }, + } + + bftSynchronizer := &smartbft.BFTSynchronizer{ + LatestConfig: func() (types.Configuration, []uint64) { + return types.Configuration{ + SelfID: 1, + }, []uint64{1, 2, 3, 4} + }, + BlockToDecision: func(block *cb.Block) *types.Decision { + if block == b99 { + return &decision.Latest + } + return nil + }, + OnCommit: noopUpdateLastHash, + Support: fakeCS, + LocalConfigCluster: localconfig.Cluster{}, + BlockPullerFactory: bpf, + Logger: flogging.MustGetLogger("test.smartbft"), + } + + require.NotNil(t, bftSynchronizer) + + resp := bftSynchronizer.Sync() + require.NotNil(t, resp) + require.Equal(t, *decision, resp) + }) + + t.Run("remote endpoints above my height: 2 blocks", func(t *testing.T) { + bp := &mocks.FakeBlockPuller{} + bpf := &mocks.FakeBlockPullerFactory{} + bpf.CreateBlockPullerReturns(bp, nil) + + bp.HeightsByEndpointsReturns( + map[string]uint64{ + "example.com:1": 100, + "example.com:2": 101, + "example.com:3": 102, + "example.com:4": 103, + }, + "example.com:1", + nil, + ) + + var ledger []*cb.Block + for i := uint64(0); i < 100; i++ { + ledger = append(ledger, &cb.Block{Header: &cb.BlockHeader{Number: i}}) + } + ledger[42] = b42 + ledger[99] = b99 + + fakeCS := &mocks2.FakeConsenterSupport{} + fakeCS.HeightCalls(func() uint64 { + return uint64(len(ledger)) + }) + fakeCS.BlockCalls(func(u uint64) *cb.Block { + b := ledger[u] + t.Logf("Block Calls: %d, %v", u, b) + return ledger[u] + }) + fakeCS.SequenceCalls(func() uint64 { return blockNum2configSqn[uint64(len(ledger))] }) + fakeCS.WriteConfigBlockCalls(func(b *cb.Block, m []byte) { + ledger = append(ledger, b) + }) + fakeCS.WriteBlockCalls(func(b *cb.Block, m []byte) { + ledger = append(ledger, b) + }) + + fakeOrdererConfig := &mocks.OrdererConfig{} + fakeOrdererConfig.ConsentersReturns([]*cb.Consenter{ + {Id: 1}, {Id: 2}, {Id: 3}, {Id: 4}, + }) + fakeCS.SharedConfigReturns(fakeOrdererConfig) + + fakeVerifierFactory := &mocks.VerifierFactory{} + fakeVerifier := &mocks.UpdatableBlockVerifier{} + fakeVerifierFactory.CreateBlockVerifierReturns(fakeVerifier, nil) + + fakeBFTDelivererFactory := &mocks.BFTDelivererFactory{} + fakeBFTDeliverer := &mocks.BFTBlockDeliverer{} + fakeBFTDelivererFactory.CreateBFTDelivererReturns(fakeBFTDeliverer) + + decision := &types.SyncResponse{ + Latest: types.Decision{ + Proposal: types.Proposal{Header: []byte{1, 1, 1, 1}}, + Signatures: []types.Signature{{ID: 1}, {ID: 2}, {ID: 3}}, + }, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: true, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + }, + } + + bftSynchronizer := &smartbft.BFTSynchronizer{ + LatestConfig: func() (types.Configuration, []uint64) { + return types.Configuration{ + SelfID: 1, + }, []uint64{1, 2, 3, 4} + }, + BlockToDecision: func(block *cb.Block) *types.Decision { + if block == b101 { + return &decision.Latest + } + return nil + }, + OnCommit: func(block *cb.Block) types.Reconfig { + if block == b101 { + return types.Reconfig{ + InLatestDecision: true, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + } + } + return types.Reconfig{} + }, + Support: fakeCS, + ClusterDialer: &cluster.PredicateDialer{Config: comm.ClientConfig{}}, + LocalConfigCluster: localconfig.Cluster{}, + BlockPullerFactory: bpf, + VerifierFactory: fakeVerifierFactory, + BFTDelivererFactory: fakeBFTDelivererFactory, + Logger: flogging.MustGetLogger("test.smartbft"), + } + + require.NotNil(t, bftSynchronizer) + + wg := sync.WaitGroup{} + wg.Add(1) + stopDeliverCh := make(chan struct{}) + fakeBFTDeliverer.DeliverBlocksCalls(func() { + b := bftSynchronizer.Buffer() + require.NotNil(t, b) + err := b.HandleBlock("mychannel", b100) + require.NoError(t, err) + err = b.HandleBlock("mychannel", b101) + require.NoError(t, err) + <-stopDeliverCh // the goroutine will block here + wg.Done() + }) + fakeBFTDeliverer.StopCalls(func() { + close(stopDeliverCh) + }) + + resp := bftSynchronizer.Sync() + require.NotNil(t, resp) + require.Equal(t, *decision, resp) + require.Equal(t, 102, len(ledger)) + require.Equal(t, 1, fakeCS.WriteBlockCallCount()) + require.Equal(t, 1, fakeCS.WriteConfigBlockCallCount()) + wg.Wait() + }) + + t.Run("remote endpoints above my height: 3 blocks", func(t *testing.T) { + bp := &mocks.FakeBlockPuller{} + bpf := &mocks.FakeBlockPullerFactory{} + bpf.CreateBlockPullerReturns(bp, nil) + + bp.HeightsByEndpointsReturns( + map[string]uint64{ + "example.com:1": 100, + "example.com:2": 103, + "example.com:3": 103, + "example.com:4": 200, + }, + "example.com:1", + nil, + ) + + var ledger []*cb.Block + for i := uint64(0); i < 100; i++ { + ledger = append(ledger, &cb.Block{Header: &cb.BlockHeader{Number: i}}) + } + ledger[42] = b42 + ledger[99] = b99 + + fakeCS := &mocks2.FakeConsenterSupport{} + fakeCS.HeightCalls(func() uint64 { + return uint64(len(ledger)) + }) + fakeCS.BlockCalls(func(u uint64) *cb.Block { + b := ledger[u] + t.Logf("Block Calls: %d, %v", u, b) + return ledger[u] + }) + fakeCS.SequenceCalls(func() uint64 { return blockNum2configSqn[uint64(len(ledger))] }) + fakeCS.WriteConfigBlockCalls(func(b *cb.Block, m []byte) { + ledger = append(ledger, b) + }) + fakeCS.WriteBlockCalls(func(b *cb.Block, m []byte) { + ledger = append(ledger, b) + }) + + fakeOrdererConfig := &mocks.OrdererConfig{} + fakeOrdererConfig.ConsentersReturns([]*cb.Consenter{ + {Id: 1}, {Id: 2}, {Id: 3}, {Id: 4}, + }) + fakeCS.SharedConfigReturns(fakeOrdererConfig) + + fakeVerifierFactory := &mocks.VerifierFactory{} + fakeVerifier := &mocks.UpdatableBlockVerifier{} + fakeVerifierFactory.CreateBlockVerifierReturns(fakeVerifier, nil) + + fakeBFTDelivererFactory := &mocks.BFTDelivererFactory{} + fakeBFTDeliverer := &mocks.BFTBlockDeliverer{} + fakeBFTDelivererFactory.CreateBFTDelivererReturns(fakeBFTDeliverer) + + decision := &types.SyncResponse{ + Latest: types.Decision{ + Proposal: types.Proposal{Header: []byte{1, 1, 1, 1}}, + Signatures: []types.Signature{{ID: 1}, {ID: 2}, {ID: 3}}, + }, + Reconfig: types.ReconfigSync{ + InReplicatedDecisions: true, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + }, + } + + bftSynchronizer := &smartbft.BFTSynchronizer{ + LatestConfig: func() (types.Configuration, []uint64) { + return types.Configuration{ + SelfID: 1, + }, []uint64{1, 2, 3, 4} + }, + BlockToDecision: func(block *cb.Block) *types.Decision { + if block == b102 { + return &decision.Latest + } + return nil + }, + OnCommit: func(block *cb.Block) types.Reconfig { + if block == b101 { + return types.Reconfig{ + InLatestDecision: true, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + } + } else if block == b102 { + return types.Reconfig{ + InLatestDecision: false, + CurrentNodes: []uint64{1, 2, 3, 4}, + CurrentConfig: types.Configuration{SelfID: 1}, + } + } + return types.Reconfig{} + }, + Support: fakeCS, + ClusterDialer: &cluster.PredicateDialer{Config: comm.ClientConfig{}}, + LocalConfigCluster: localconfig.Cluster{}, + BlockPullerFactory: bpf, + VerifierFactory: fakeVerifierFactory, + BFTDelivererFactory: fakeBFTDelivererFactory, + Logger: flogging.MustGetLogger("test.smartbft"), + } + require.NotNil(t, bftSynchronizer) + + wg := sync.WaitGroup{} + wg.Add(1) + stopDeliverCh := make(chan struct{}) + fakeBFTDeliverer.DeliverBlocksCalls(func() { + b := bftSynchronizer.Buffer() + require.NotNil(t, b) + err := b.HandleBlock("mychannel", b100) + require.NoError(t, err) + err = b.HandleBlock("mychannel", b101) + require.NoError(t, err) + err = b.HandleBlock("mychannel", b102) + require.NoError(t, err) + + <-stopDeliverCh // the goroutine will block here + wg.Done() + }) + fakeBFTDeliverer.StopCalls(func() { + close(stopDeliverCh) + }) + + resp := bftSynchronizer.Sync() + require.NotNil(t, resp) + require.Equal(t, *decision, resp) + require.Equal(t, 103, len(ledger)) + require.Equal(t, 2, fakeCS.WriteBlockCallCount()) + require.Equal(t, 1, fakeCS.WriteConfigBlockCallCount()) + wg.Wait() + }) +} diff --git a/orderer/consensus/smartbft/synchronizer_test.go b/orderer/consensus/smartbft/synchronizer_test.go index d689f2c532e..e8e0e7fc496 100644 --- a/orderer/consensus/smartbft/synchronizer_test.go +++ b/orderer/consensus/smartbft/synchronizer_test.go @@ -308,7 +308,7 @@ func makeBlockWithMetadata(sqnNum, lastConfigIndex uint64, viewMetadata *smartbf Value: protoutil.MarshalOrPanic(&cb.OrdererBlockMetadata{ ConsenterMetadata: protoutil.MarshalOrPanic(viewMetadata), LastConfig: &cb.LastConfig{ - Index: sqnNum, + Index: lastConfigIndex, }, }), }) diff --git a/orderer/consensus/smartbft/util.go b/orderer/consensus/smartbft/util.go index a7d03294aa2..858d73d76d3 100644 --- a/orderer/consensus/smartbft/util.go +++ b/orderer/consensus/smartbft/util.go @@ -14,6 +14,7 @@ import ( "encoding/pem" "fmt" "sort" + "time" "github.com/SmartBFT-Go/consensus/pkg/types" "github.com/SmartBFT-Go/consensus/smartbftprotos" @@ -26,10 +27,10 @@ import ( "github.com/hyperledger/fabric/common/channelconfig" "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/deliverclient" + "github.com/hyperledger/fabric/internal/pkg/identity" + "github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider" "github.com/hyperledger/fabric/orderer/common/cluster" - "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/orderer/consensus" - "github.com/hyperledger/fabric/orderer/consensus/etcdraft" "github.com/hyperledger/fabric/orderer/consensus/smartbft/util" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" @@ -123,62 +124,6 @@ func configBlockToBFTConfig(selfID uint64, block *cb.Block, bccsp bccsp.BCCSP) ( return util.ConfigFromMetadataOptions(selfID, consensusConfigOptions) } -//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller - -// newBlockPuller creates a new block puller -func newBlockPuller( - support consensus.ConsenterSupport, - baseDialer *cluster.PredicateDialer, - clusterConfig localconfig.Cluster, - bccsp bccsp.BCCSP, -) (BlockPuller, error) { - verifyBlockSequence := func(blocks []*cb.Block, _ string) error { - vb := cluster.BlockVerifierBuilder(bccsp) - return cluster.VerifyBlocksBFT(blocks, support.SignatureVerifier(), vb) - } - - stdDialer := &cluster.StandardDialer{ - Config: baseDialer.Config.Clone(), - } - stdDialer.Config.AsyncConnect = false - stdDialer.Config.SecOpts.VerifyCertificate = nil - - // Extract the TLS CA certs and endpoints from the configuration, - endpoints, err := etcdraft.EndpointconfigFromSupport(support, bccsp) - if err != nil { - return nil, err - } - - logger := flogging.MustGetLogger("orderer.common.cluster.puller") - - der, _ := pem.Decode(stdDialer.Config.SecOpts.Certificate) - if der == nil { - return nil, errors.Errorf("client certificate isn't in PEM format: %v", - string(stdDialer.Config.SecOpts.Certificate)) - } - - myCert, err := x509.ParseCertificate(der.Bytes) - if err != nil { - logger.Warnf("Failed parsing my own TLS certificate: %v, therefore we may connect to our own endpoint when pulling blocks", err) - } - - bp := &cluster.BlockPuller{ - MyOwnTLSCert: myCert, - VerifyBlockSequence: verifyBlockSequence, - Logger: logger, - RetryTimeout: clusterConfig.ReplicationRetryTimeout, - MaxTotalBufferBytes: clusterConfig.ReplicationBufferSize, - FetchTimeout: clusterConfig.ReplicationPullTimeout, - Endpoints: endpoints, - Signer: support, - TLSCert: der.Bytes, - Channel: support.ChannelID(), - Dialer: stdDialer, - } - - return bp, nil -} - func getViewMetadataFromBlock(block *cb.Block) (*smartbftprotos.ViewMetadata, error) { if block.Header.Number == 0 { // Genesis block has no prior metadata so we just return an un-initialized metadata @@ -515,3 +460,113 @@ func createSmartBftConfig(odrdererConfig channelconfig.Orderer) (*smartbft.Optio configOptions.RequestBatchMaxBytes = uint64(batchSize.AbsoluteMaxBytes) return configOptions, nil } + +// ledgerInfoAdapter translates from blocksprovider.LedgerInfo in to calls to consensus.ConsenterSupport. +type ledgerInfoAdapter struct { + support consensus.ConsenterSupport +} + +func (a *ledgerInfoAdapter) LedgerHeight() (uint64, error) { + return a.support.Height(), nil +} + +func (a *ledgerInfoAdapter) GetCurrentBlockHash() ([]byte, error) { + return nil, errors.New("not implemented: never used in orderer") +} + +//go:generate counterfeiter -o mocks/verifier_factory.go --fake-name VerifierFactory . VerifierFactory + +type VerifierFactory interface { + CreateBlockVerifier( + configBlock *cb.Block, + lastBlock *cb.Block, + cryptoProvider bccsp.BCCSP, + lg *flogging.FabricLogger, + ) (deliverclient.CloneableUpdatableBlockVerifier, error) +} + +type verifierCreator struct{} + +func (*verifierCreator) CreateBlockVerifier( + configBlock *cb.Block, + lastBlock *cb.Block, + cryptoProvider bccsp.BCCSP, + lg *flogging.FabricLogger, +) (deliverclient.CloneableUpdatableBlockVerifier, error) { + updatableVerifier, err := deliverclient.NewBlockVerificationAssistant(configBlock, lastBlock, cryptoProvider, lg) + return updatableVerifier, err +} + +//go:generate counterfeiter -o mocks/bft_deliverer_factory.go --fake-name BFTDelivererFactory . BFTDelivererFactory + +type BFTDelivererFactory interface { + CreateBFTDeliverer( + channelID string, + blockHandler blocksprovider.BlockHandler, + ledger blocksprovider.LedgerInfo, + updatableBlockVerifier blocksprovider.UpdatableBlockVerifier, + dialer blocksprovider.Dialer, + orderersSourceFactory blocksprovider.OrdererConnectionSourceFactory, + cryptoProvider bccsp.BCCSP, + doneC chan struct{}, + signer identity.SignerSerializer, + deliverStreamer blocksprovider.DeliverStreamer, + censorshipDetectorFactory blocksprovider.CensorshipDetectorFactory, + logger *flogging.FabricLogger, + initialRetryInterval time.Duration, + maxRetryInterval time.Duration, + blockCensorshipTimeout time.Duration, + maxRetryDuration time.Duration, + maxRetryDurationExceededHandler blocksprovider.MaxRetryDurationExceededHandler, + ) BFTBlockDeliverer +} + +type bftDelivererCreator struct{} + +func (*bftDelivererCreator) CreateBFTDeliverer( + channelID string, + blockHandler blocksprovider.BlockHandler, + ledger blocksprovider.LedgerInfo, + updatableBlockVerifier blocksprovider.UpdatableBlockVerifier, + dialer blocksprovider.Dialer, + orderersSourceFactory blocksprovider.OrdererConnectionSourceFactory, + cryptoProvider bccsp.BCCSP, + doneC chan struct{}, + signer identity.SignerSerializer, + deliverStreamer blocksprovider.DeliverStreamer, + censorshipDetectorFactory blocksprovider.CensorshipDetectorFactory, + logger *flogging.FabricLogger, + initialRetryInterval time.Duration, + maxRetryInterval time.Duration, + blockCensorshipTimeout time.Duration, + maxRetryDuration time.Duration, + maxRetryDurationExceededHandler blocksprovider.MaxRetryDurationExceededHandler, +) BFTBlockDeliverer { + bftDeliverer := &blocksprovider.BFTDeliverer{ + ChannelID: channelID, + BlockHandler: blockHandler, + Ledger: ledger, + UpdatableBlockVerifier: updatableBlockVerifier, + Dialer: dialer, + OrderersSourceFactory: orderersSourceFactory, + CryptoProvider: cryptoProvider, + DoneC: doneC, + Signer: signer, + DeliverStreamer: deliverStreamer, + CensorshipDetectorFactory: censorshipDetectorFactory, + Logger: logger, + InitialRetryInterval: initialRetryInterval, + MaxRetryInterval: maxRetryInterval, + BlockCensorshipTimeout: blockCensorshipTimeout, + MaxRetryDuration: maxRetryDuration, + MaxRetryDurationExceededHandler: maxRetryDurationExceededHandler, + } + return bftDeliverer +} + +//go:generate counterfeiter -o mocks/bft_block_deliverer.go --fake-name BFTBlockDeliverer . BFTBlockDeliverer +type BFTBlockDeliverer interface { + Stop() + DeliverBlocks() + Initialize(channelConfig *cb.Config, selfEndpoint string) +} diff --git a/sampleconfig/configtx.yaml b/sampleconfig/configtx.yaml index c0335eeba2c..43323298be6 100644 --- a/sampleconfig/configtx.yaml +++ b/sampleconfig/configtx.yaml @@ -321,6 +321,12 @@ Orderer: &OrdererDefaults # network. When set to 0, this implies no maximum number of channels. MaxChannels: 0 + # ConsenterMapping contains the definition of consenter identity, endpoints, and crypto material. + # The ConsenterMapping is used in the BFT consensus protocol, and should include enough servers to ensure + # fault-tolerance; In BFT this number is at least 3*F+1, where F is the number of potential failures. + # In BFT it is highly recommended that the addresses for delivery & broadcast (Ordrer/Addresses or + # the OrdererEndpoints item in the org definition) map 1:1 to the Orderer/ConsenterMapping (for cluster consensus). + # That is, every consenter should be represented by a delivery endpoint. ConsenterMapping: - ID: 1 Host: bft0.example.com