Skip to content

Commit

Permalink
BFT Block Puller: updatable connection source
Browse files Browse the repository at this point in the history
Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: Id0b80df892595cf2b9d7b7b50bfa9069f0eb6905
  • Loading branch information
tock-ibm committed Dec 12, 2023
1 parent 896f113 commit ef2a850
Show file tree
Hide file tree
Showing 16 changed files with 638 additions and 179 deletions.
41 changes: 41 additions & 0 deletions common/deliverclient/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package deliverclient

import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)

func ExtractConfigFromBlock(block *common.Block) (*common.Config, error) {
env, err := protoutil.ExtractEnvelope(block, 0)
if err != nil {
return nil, err
}
payload, err := protoutil.UnmarshalPayload(env.Payload)
if err != nil {
return nil, err
}
if payload.Header == nil {
return nil, errors.New("nil header in payload")
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG {
return nil, errors.New("not a config block")
}
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}

return configEnvelope.GetConfig(), nil
}
26 changes: 15 additions & 11 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ type Config struct {
// Gossip enables to enumerate peers in the channel, send a message to peers,
// and add a block to the gossip state transfer layer.
Gossip blocksprovider.GossipServiceAdapter
// OrdererSource provides orderer endpoints, complete with TLS cert pools.
OrdererSource *orderers.ConnectionSource
// OrdererEndpointOverrides provides peer-specific orderer endpoints overrides
OrdererEndpointOverrides map[string]*orderers.Endpoint
// Signer is the identity used to sign requests.
Signer identity.SignerSerializer
// DeliverServiceConfig is the configuration object.
Expand Down Expand Up @@ -191,14 +191,16 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
ChannelConfig: d.conf.ChannelConfig,
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
MaxRetryDurationExceededHandler: func() (stopRetries bool) {
return !d.conf.IsStaticLeader
},
Expand Down Expand Up @@ -254,7 +256,9 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
ChannelConfig: d.conf.ChannelConfig,
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Expand Down
25 changes: 1 addition & 24 deletions core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,36 +288,13 @@ func (p *Peer) createChannel(
mspmgmt.XXXSetMSPManager(cid, bundle.MSPManager())
}

osLogger := flogging.MustGetLogger("peer.orderers")
namedOSLogger := osLogger.With("channel", cid)
ordererSource := orderers.NewConnectionSource(namedOSLogger, p.OrdererEndpointOverrides)

ordererSourceCallback := func(bundle *channelconfig.Bundle) {
globalAddresses := bundle.ChannelConfig().OrdererAddresses()
orgAddresses := map[string]orderers.OrdererOrg{}
if ordererConfig, ok := bundle.OrdererConfig(); ok {
for orgName, org := range ordererConfig.Organizations() {
var certs [][]byte
certs = append(certs, org.MSP().GetTLSRootCerts()...)
certs = append(certs, org.MSP().GetTLSIntermediateCerts()...)

orgAddresses[orgName] = orderers.OrdererOrg{
Addresses: org.Endpoints(),
RootCerts: certs,
}
}
}
ordererSource.Update(globalAddresses, orgAddresses)
}

channel := &Channel{
ledger: l,
resources: bundle,
cryptoProvider: p.CryptoProvider,
}

callbacks := []channelconfig.BundleActor{
ordererSourceCallback,
gossipCallbackWrapper,
trustedRootsCallbackWrapper,
mspCallback,
Expand Down Expand Up @@ -373,7 +350,7 @@ func (p *Peer) createChannel(

p.GossipService.InitializeChannel(
bundle.ConfigtxValidator().ChannelID(),
ordererSource,
p.OrdererEndpointOverrides,
store,
gossipservice.Support{
Validator: validator,
Expand Down
30 changes: 18 additions & 12 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type GossipServiceAdapter interface {
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
Service(g GossipServiceAdapter, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
}

type deliveryFactoryImpl struct {
Expand All @@ -139,21 +139,20 @@ type deliveryFactoryImpl struct {
// Returns an instance of delivery service
func (df *deliveryFactoryImpl) Service(
g GossipServiceAdapter,
ordererSource *orderers.ConnectionSource,
mcs api.MessageCryptoService, // TODO remove
ordererEndpointOverrides map[string]*orderers.Endpoint,
isStaticLead bool,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) deliverservice.DeliverService {
return deliverservice.NewDeliverService(
&deliverservice.Config{
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererSource: ordererSource,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererEndpointOverrides: ordererEndpointOverrides,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
})
}

Expand Down Expand Up @@ -334,7 +333,14 @@ type Support struct {
}

// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *GossipService) InitializeChannel(channelID string, ordererSource *orderers.ConnectionSource, store *transientstore.Store, support Support, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) {
func (g *GossipService) InitializeChannel(
channelID string,
ordererEndpointOverrides map[string]*orderers.Endpoint,
store *transientstore.Store,
support Support,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) {
g.lock.Lock()
defer g.lock.Unlock()
// Initialize new state provider for given committer
Expand Down Expand Up @@ -393,7 +399,7 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order
blockingMode,
stateConfig)
if g.deliveryService[channelID] == nil {
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, g.mcs, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererEndpointOverrides, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
}

// Delivery service might be nil only if it was not able to get connected
Expand Down
17 changes: 8 additions & 9 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/bccsp/sw"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/ledger"
Expand Down Expand Up @@ -191,7 +190,7 @@ func TestLeaderElectionWithDeliverClient(t *testing.T) {
gossips[i].deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running = false

gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
service, exist := gossips[i].leaderElection[channelName]
Expand Down Expand Up @@ -252,7 +251,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running = false
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}
Expand All @@ -265,7 +264,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
channelName = "chanB"
for i := 0; i < n; i++ {
deliverServiceFactory.service.running = false
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}
Expand Down Expand Up @@ -309,7 +308,7 @@ func TestWithStaticDeliverClientNotLeader(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running = false
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}
Expand Down Expand Up @@ -354,7 +353,7 @@ func TestWithStaticDeliverClientBothStaticAndLeaderElection(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].deliveryFactory = deliverServiceFactory
require.Panics(t, func() {
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}, "Dynamic leader election based and static connection to ordering service can't exist simultaneously")
Expand All @@ -367,7 +366,7 @@ type mockDeliverServiceFactory struct {
service *mockDeliverService
}

func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, *orderers.ConnectionSource, api.MessageCryptoService, bool, *common.Config, bccsp.BCCSP) deliverservice.DeliverService {
func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, map[string]*orderers.Endpoint, bool, *common.Config, bccsp.BCCSP) deliverservice.DeliverService {
return mf.service
}

Expand Down Expand Up @@ -419,7 +418,7 @@ func (li *mockLedgerInfo) GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.BlockA
}

func (li *mockLedgerInfo) GetCurrentBlockHash() ([]byte, error) {
panic("implement me")
return []byte{1, 2, 3, 4}, nil
}

// LedgerHeight returns mocked value to the ledger height
Expand Down Expand Up @@ -905,7 +904,7 @@ func TestInvalidInitialization(t *testing.T) {
go grpcServer.Serve(socket)
defer grpcServer.Stop()

dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), &naiveCryptoService{}, false, nil, nil)
dc := gService.deliveryFactory.Service(gService, nil, false, nil, nil)
require.NotNil(t, dc)
}

Expand Down
19 changes: 5 additions & 14 deletions gossip/service/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import (
"github.com/hyperledger/fabric/bccsp/sw"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/internal/configtxgen/encoder"
Expand Down Expand Up @@ -81,8 +79,8 @@ type embeddingDeliveryServiceFactory struct {
DeliveryServiceFactory
}

func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *common.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService {
ds := edsf.DeliveryServiceFactory.Service(g, ordererSource, mcs, false, channelConfig, cryptoProvider)
func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *common.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService {
ds := edsf.DeliveryServiceFactory.Service(g, nil, false, channelConfig, cryptoProvider)
return newEmbeddingDeliveryService(ds)
}

Expand Down Expand Up @@ -150,16 +148,9 @@ func TestLeaderYield(t *testing.T) {
},
}

gs.InitializeChannel(
channelName,
orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil),
store.Store,
Support{
Committer: &mockLedgerInfo{1},
},
channelConfigProto,
cryptoProvider,
)
gs.InitializeChannel(channelName, nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, channelConfigProto, cryptoProvider)
return gs
}

Expand Down
Loading

0 comments on commit ef2a850

Please sign in to comment.