Skip to content

Commit

Permalink
BFT Block Puller: Refactor gossip out
Browse files Browse the repository at this point in the history
Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: I359e8402954b6db3a7a2df7a29d60fb04cfda63c
  • Loading branch information
tock-ibm committed Aug 6, 2023
1 parent c9505ca commit 6647a0b
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 193 deletions.
29 changes: 17 additions & 12 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,12 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b

func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo blocksprovider.LedgerInfo) (*blocksprovider.Deliverer, error) {
dc := &blocksprovider.Deliverer{
ChannelID: chainID,
Gossip: d.conf.Gossip,
ChannelID: chainID,
BlockHandler: &GossipBlockHandler{
gossip: d.conf.Gossip,
blockGossipDisabled: !d.conf.DeliverServiceConfig.BlockGossipEnabled,
logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
},
Ledger: ledgerInfo,
BlockVerifier: d.conf.CryptoSvc,
Dialer: blocksprovider.DialerAdapter{
Expand All @@ -169,16 +173,17 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryDelay: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
BlockGossipDisabled: !d.conf.DeliverServiceConfig.BlockGossipEnabled,
InitialRetryDelay: 100 * time.Millisecond,
YieldLeadership: !d.conf.IsStaticLeader,
Orderers: d.conf.OrdererSource,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
MaxRetryDurationExceededHandler: func() (stopRetries bool) {
return !d.conf.IsStaticLeader
},
}

if d.conf.DeliverServiceConfig.SecOpts.RequireClientCert {
Expand Down
74 changes: 74 additions & 0 deletions core/deliverservice/gossip_block_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package deliverservice

import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/gossip"
"github.com/hyperledger/fabric/common/flogging"
"github.com/pkg/errors"
)

// GossipServiceAdapter serves to provide basic functionality
// required from gossip service by delivery service
//
//go:generate counterfeiter -o fake/gossip_service_adapter.go --fake-name GossipServiceAdapter . GossipServiceAdapter
type GossipServiceAdapter interface {
// AddPayload adds payload to the local state sync buffer
AddPayload(chainID string, payload *gossip.Payload) error

// Gossip the message across the peers
Gossip(msg *gossip.GossipMessage)
}

type GossipBlockHandler struct {
gossip GossipServiceAdapter
blockGossipDisabled bool
logger *flogging.FabricLogger
}

func (h *GossipBlockHandler) HandleBlock(channelID string, block *common.Block) error {
marshaledBlock, err := proto.Marshal(block)
if err != nil {
return errors.WithMessage(err, "block from orderer could not be re-marshaled")
}

// Create payload with a block received
blockNum := block.GetHeader().GetNumber()
payload := &gossip.Payload{
Data: marshaledBlock,
SeqNum: blockNum,
}

// Use payload to create gossip message
gossipMsg := &gossip.GossipMessage{
Nonce: 0,
Tag: gossip.GossipMessage_CHAN_AND_ORG,
Channel: []byte(channelID),
Content: &gossip.GossipMessage_DataMsg{
DataMsg: &gossip.DataMessage{
Payload: payload,
},
},
}

h.logger.Debugf("Adding payload to local buffer, blockNum = [%d]", blockNum)
// Add payload to local state payloads buffer
if err := h.gossip.AddPayload(channelID, payload); err != nil {
h.logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
return errors.WithMessage(err, "could not add block as payload")
}
if h.blockGossipDisabled {
return nil
}
// Gossip messages with other nodes
h.logger.Debugf("Gossiping block [%d]", blockNum)
h.gossip.Gossip(gossipMsg)

return nil
}
55 changes: 55 additions & 0 deletions core/deliverservice/gossip_block_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package deliverservice

import (
"testing"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/internal/pkg/peer/blocksprovider/fake"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestGossipBlockHandler_HandleBlock(t *testing.T) {
h := GossipBlockHandler{
blockGossipDisabled: false,
logger: flogging.MustGetLogger("test.GossipBlockHandler"),
}

t.Run("error: cannot marshal block", func(t *testing.T) {
h.gossip = &fake.GossipServiceAdapter{}
err := h.HandleBlock("testchannel", nil)
require.EqualError(t, err, "block from orderer could not be re-marshaled: proto: Marshal called with nil")
})

t.Run("error: cannot add payload", func(t *testing.T) {
fakeGossip := &fake.GossipServiceAdapter{}
fakeGossip.AddPayloadReturns(errors.New("oops"))
h.gossip = fakeGossip
err := h.HandleBlock("testchannel", &common.Block{Header: &common.BlockHeader{Number: 8}})
require.EqualError(t, err, "could not add block as payload: oops")
})

t.Run("valid: gossip", func(t *testing.T) {
fakeGossip := &fake.GossipServiceAdapter{}
h.gossip = fakeGossip
err := h.HandleBlock("testchannel", &common.Block{Header: &common.BlockHeader{Number: 8}})
require.NoError(t, err)
require.Equal(t, 1, fakeGossip.GossipCallCount())
})

t.Run("valid: no gossip", func(t *testing.T) {
fakeGossip := &fake.GossipServiceAdapter{}
h.gossip = fakeGossip
h.blockGossipDisabled = true
err := h.HandleBlock("testchannel", &common.Block{Header: &common.BlockHeader{Number: 8}})
require.NoError(t, err)
require.Equal(t, 0, fakeGossip.GossipCallCount())
})
}
34 changes: 18 additions & 16 deletions internal/pkg/peer/blocksprovider/bft_deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// BFTDeliverer TODO this is a skeleton
type BFTDeliverer struct { // TODO
ChannelID string
Gossip GossipServiceAdapter
BlockHandler BlockHandler
Ledger LedgerInfo
BlockVerifier BlockVerifier
Dialer Dialer
Expand All @@ -30,12 +30,15 @@ type BFTDeliverer struct { // TODO
Signer identity.SignerSerializer
DeliverStreamer DeliverStreamer
Logger *flogging.FabricLogger
YieldLeadership bool

BlockGossipDisabled bool
MaxRetryDelay time.Duration
InitialRetryDelay time.Duration
MaxRetryDuration time.Duration
// The maximal value of the actual retry interval, which cannot increase beyond this value
MaxRetryInterval time.Duration
// The initial value of the actual retry interval, which is increased on every failed retry
InitialRetryInterval time.Duration
// After this duration, the MaxRetryDurationExceededHandler is called to decide whether to keep trying
MaxRetryDuration time.Duration
// This function is called after MaxRetryDuration of failed retries to decide whether to keep trying
MaxRetryDurationExceededHandler MaxRetryDurationExceededHandler

// TLSCertHash should be nil when TLS is not enabled
TLSCertHash []byte // util.ComputeSHA256(b.credSupport.GetClientCertificate().Certificate[0])
Expand Down Expand Up @@ -196,16 +199,15 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) {
}

blockRcv := &BlockReceiver{
channelID: d.ChannelID,
gossip: d.Gossip,
blockGossipDisabled: d.BlockGossipDisabled,
blockVerifier: d.BlockVerifier,
deliverClient: deliverClient,
cancelSendFunc: cancel,
recvC: make(chan *orderer.DeliverResponse),
stopC: make(chan struct{}),
endpoint: source,
logger: d.Logger.With("orderer-address", source.Address),
channelID: d.ChannelID,
blockHandler: d.BlockHandler,
blockVerifier: d.BlockVerifier,
deliverClient: deliverClient,
cancelSendFunc: cancel,
recvC: make(chan *orderer.DeliverResponse),
stopC: make(chan struct{}),
endpoint: source,
logger: d.Logger.With("orderer-address", source.Address),
}

d.mutex.Lock()
Expand Down
30 changes: 16 additions & 14 deletions internal/pkg/peer/blocksprovider/bft_deliverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type bftDelivererTestSetup struct {

fakeDialer *fake.Dialer
fakeGossipServiceAdapter *fake.GossipServiceAdapter
fakeBlockHandler *fake.BlockHandler
fakeOrdererConnectionSource *fake.OrdererConnectionSource
fakeLedgerInfo *fake.LedgerInfo
fakeBlockVerifier *fake.BlockVerifier
Expand All @@ -49,6 +50,7 @@ func newBFTDelivererTestSetup(t *testing.T) *bftDelivererTestSetup {
withT: NewWithT(t),
fakeDialer: &fake.Dialer{},
fakeGossipServiceAdapter: &fake.GossipServiceAdapter{},
fakeBlockHandler: &fake.BlockHandler{},
fakeOrdererConnectionSource: &fake.OrdererConnectionSource{},
fakeLedgerInfo: &fake.LedgerInfo{},
fakeBlockVerifier: &fake.BlockVerifier{},
Expand Down Expand Up @@ -122,20 +124,20 @@ func (s *bftDelivererTestSetup) beforeEach() {
s.fakeDeliverStreamer.DeliverReturns(s.fakeDeliverClient, nil)

s.d = &blocksprovider.BFTDeliverer{
ChannelID: "channel-id",
Gossip: s.fakeGossipServiceAdapter,
Ledger: s.fakeLedgerInfo,
BlockVerifier: s.fakeBlockVerifier,
Dialer: s.fakeDialer,
Orderers: s.fakeOrdererConnectionSource,
DoneC: make(chan struct{}),
Signer: s.fakeSigner,
DeliverStreamer: s.fakeDeliverStreamer,
Logger: flogging.MustGetLogger("blocksprovider"),
TLSCertHash: []byte("tls-cert-hash"),
MaxRetryDuration: time.Hour,
MaxRetryDelay: 10 * time.Second,
InitialRetryDelay: 100 * time.Millisecond,
ChannelID: "channel-id",
BlockHandler: s.fakeBlockHandler,
Ledger: s.fakeLedgerInfo,
BlockVerifier: s.fakeBlockVerifier,
Dialer: s.fakeDialer,
Orderers: s.fakeOrdererConnectionSource,
DoneC: make(chan struct{}),
Signer: s.fakeSigner,
DeliverStreamer: s.fakeDeliverStreamer,
Logger: flogging.MustGetLogger("blocksprovider"),
TLSCertHash: []byte("tls-cert-hash"),
MaxRetryDuration: time.Hour,
MaxRetryInterval: 10 * time.Second,
InitialRetryInterval: 100 * time.Millisecond,
}
s.d.Initialize()

Expand Down
68 changes: 23 additions & 45 deletions internal/pkg/peer/blocksprovider/block_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,33 @@ import (
"fmt"
"sync"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/gossip"
"github.com/hyperledger/fabric-protos-go/orderer"
"github.com/hyperledger/fabric/common/flogging"
gossipcommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
"github.com/pkg/errors"
)

// BlockHandler abstracts the next stage of processing after the block is fetched from the orderer.
// In the peer the block is given to the gossip service.
// In the orderer the block is placed in a buffer from which the chain or the follower pull blocks.
//
//go:generate counterfeiter -o fake/block_handler.go --fake-name BlockHandler . BlockHandler
type BlockHandler interface {
// HandleBlock gives the block to the next stage of processing after fetching it from a remote orderer.
HandleBlock(channelID string, block *common.Block) error
}

type BlockReceiver struct {
channelID string
gossip GossipServiceAdapter
blockGossipDisabled bool
blockVerifier BlockVerifier
deliverClient orderer.AtomicBroadcast_DeliverClient
cancelSendFunc func()
recvC chan *orderer.DeliverResponse
stopC chan struct{}
endpoint *orderers.Endpoint
channelID string
blockHandler BlockHandler
blockVerifier BlockVerifier
deliverClient orderer.AtomicBroadcast_DeliverClient
cancelSendFunc func()
recvC chan *orderer.DeliverResponse
stopC chan struct{}
endpoint *orderers.Endpoint

mutex sync.Mutex
stopFlag bool
Expand All @@ -39,7 +46,7 @@ type BlockReceiver struct {

// Start starts a goroutine that continuously receives blocks.
func (br *BlockReceiver) Start() {
br.logger.Infof("Starting to receive")
br.logger.Infof("BlockReceiver starting")
go func() {
for {
resp, err := br.deliverClient.Recv()
Expand Down Expand Up @@ -69,11 +76,13 @@ func (br *BlockReceiver) Stop() {
defer br.mutex.Unlock()

if br.stopFlag {
br.logger.Infof("BlockReceiver already stopped")
return
}

br.stopFlag = true
close(br.stopC)
br.logger.Infof("BlockReceiver stopped")
}

// ProcessIncoming processes incoming messages until stopped or encounters an error.
Expand Down Expand Up @@ -128,42 +137,11 @@ func (br *BlockReceiver) processMsg(msg *orderer.DeliverResponse) (uint64, error
if err := br.blockVerifier.VerifyBlock(gossipcommon.ChannelID(br.channelID), blockNum, t.Block); err != nil {
return 0, errors.WithMessage(err, "block from orderer could not be verified")
}

marshaledBlock, err := proto.Marshal(t.Block)
err := br.blockHandler.HandleBlock(br.channelID, t.Block)
if err != nil {
return 0, errors.WithMessage(err, "block from orderer could not be re-marshaled")
}

// Create payload with a block received
payload := &gossip.Payload{
Data: marshaledBlock,
SeqNum: blockNum,
return 0, errors.WithMessage(err, "block from orderer could not be handled")
}

// Use payload to create gossip message
gossipMsg := &gossip.GossipMessage{
Nonce: 0,
Tag: gossip.GossipMessage_CHAN_AND_ORG,
Channel: []byte(br.channelID),
Content: &gossip.GossipMessage_DataMsg{
DataMsg: &gossip.DataMessage{
Payload: payload,
},
},
}

br.logger.Debugf("Adding payload to local buffer, blockNum = [%d]", blockNum)
// Add payload to local state payloads buffer
if err := br.gossip.AddPayload(br.channelID, payload); err != nil {
br.logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
return 0, errors.WithMessage(err, "could not add block as payload")
}
if br.blockGossipDisabled {
return blockNum, nil
}
// Gossip messages with other nodes
br.logger.Debugf("Gossiping block [%d]", blockNum)
br.gossip.Gossip(gossipMsg)
return blockNum, nil
default:
return 0, errors.Errorf("unknown message type: %T, message: %+v", t, msg)
Expand Down
Loading

0 comments on commit 6647a0b

Please sign in to comment.