Skip to content

Commit

Permalink
BFT Block Puller: updatable connection source
Browse files Browse the repository at this point in the history
Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: Id0b80df892595cf2b9d7b7b50bfa9069f0eb6905
  • Loading branch information
tock-ibm committed Dec 11, 2023
1 parent 896f113 commit 15b67c8
Show file tree
Hide file tree
Showing 18 changed files with 625 additions and 142 deletions.
41 changes: 41 additions & 0 deletions common/deliverclient/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package deliverclient

import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)

func ExtractConfigFromBlock(block *common.Block) (*common.Config, error) {
env, err := protoutil.ExtractEnvelope(block, 0)
if err != nil {
return nil, err
}
payload, err := protoutil.UnmarshalPayload(env.Payload)
if err != nil {
return nil, err
}
if payload.Header == nil {
return nil, errors.New("nil header in payload")
}
chdr, err := protoutil.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
if common.HeaderType(chdr.Type) != common.HeaderType_CONFIG {
return nil, errors.New("not a config block")
}
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
return nil, errors.Wrap(err, "invalid config envelope")
}

return configEnvelope.GetConfig(), nil
}
28 changes: 18 additions & 10 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ type Config struct {
// and add a block to the gossip state transfer layer.
Gossip blocksprovider.GossipServiceAdapter
// OrdererSource provides orderer endpoints, complete with TLS cert pools.
OrdererSource *orderers.ConnectionSource

OrdererSource *orderers.ConnectionSource // TODO remove, create internally
// OrdererEndpointOverrides provides peer-specific orderer endpoints overrides
OrdererEndpointOverrides map[string]*orderers.Endpoint

// Signer is the identity used to sign requests.
Signer identity.SignerSerializer
// DeliverServiceConfig is the configuration object.
Expand Down Expand Up @@ -191,14 +195,16 @@ func (d *deliverServiceImpl) createBlockDelivererCFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
ChannelConfig: d.conf.ChannelConfig,
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID),
MaxRetryInterval: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold,
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryInterval: 100 * time.Millisecond,
MaxRetryDurationExceededHandler: func() (stopRetries bool) {
return !d.conf.IsStaticLeader
},
Expand Down Expand Up @@ -254,7 +260,9 @@ func (d *deliverServiceImpl) createBlockDelivererBFT(chainID string, ledgerInfo
SecOpts: d.conf.DeliverServiceConfig.SecOpts,
},
},
Orderers: d.conf.OrdererSource,
OrderersSourceFactory: &orderers.ConnectionSourceFactory{Overrides: d.conf.OrdererEndpointOverrides},
ChannelConfig: d.conf.ChannelConfig,
CryptoProvider: d.conf.CryptoProvider,
DoneC: make(chan struct{}),
Signer: d.conf.Signer,
DeliverStreamer: blocksprovider.DeliverAdapter{},
Expand Down
7 changes: 4 additions & 3 deletions core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (p *Peer) createChannel(
ordererSource := orderers.NewConnectionSource(namedOSLogger, p.OrdererEndpointOverrides)

ordererSourceCallback := func(bundle *channelconfig.Bundle) {
osLogger.Infof("OrdererSource callback: %v", bundle)
globalAddresses := bundle.ChannelConfig().OrdererAddresses()
orgAddresses := map[string]orderers.OrdererOrg{}
if ordererConfig, ok := bundle.OrdererConfig(); ok {
Expand Down Expand Up @@ -373,7 +374,8 @@ func (p *Peer) createChannel(

p.GossipService.InitializeChannel(
bundle.ConfigtxValidator().ChannelID(),
ordererSource,
ordererSource, // TODO remove
p.OrdererEndpointOverrides,
store,
gossipservice.Support{
Validator: validator,
Expand All @@ -383,8 +385,7 @@ func (p *Peer) createChannel(
CapabilityProvider: channel,
},
chanConf,
p.CryptoProvider,
)
p.CryptoProvider)

p.mutex.Lock()
defer p.mutex.Unlock()
Expand Down
31 changes: 20 additions & 11 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type GossipServiceAdapter interface {
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService
}

type deliveryFactoryImpl struct {
Expand All @@ -140,20 +140,21 @@ type deliveryFactoryImpl struct {
func (df *deliveryFactoryImpl) Service(
g GossipServiceAdapter,
ordererSource *orderers.ConnectionSource,
mcs api.MessageCryptoService, // TODO remove
ordererEndpointOverrides map[string]*orderers.Endpoint,
isStaticLead bool,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) deliverservice.DeliverService {
return deliverservice.NewDeliverService(
&deliverservice.Config{
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererSource: ordererSource,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
IsStaticLeader: isStaticLead,
Gossip: g,
Signer: df.signer,
DeliverServiceConfig: df.deliverServiceConfig,
OrdererSource: ordererSource, // TODO remove, create internally
OrdererEndpointOverrides: ordererEndpointOverrides,
ChannelConfig: channelConfig,
CryptoProvider: cryptoProvider,
})
}

Expand Down Expand Up @@ -334,7 +335,15 @@ type Support struct {
}

// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *GossipService) InitializeChannel(channelID string, ordererSource *orderers.ConnectionSource, store *transientstore.Store, support Support, channelConfig *cb.Config, cryptoProvider bccsp.BCCSP) {
func (g *GossipService) InitializeChannel(
channelID string,
ordererSource *orderers.ConnectionSource, // TODO remove, create internally
ordererEndpointOverrides map[string]*orderers.Endpoint,
store *transientstore.Store,
support Support,
channelConfig *cb.Config,
cryptoProvider bccsp.BCCSP,
) {
g.lock.Lock()
defer g.lock.Unlock()
// Initialize new state provider for given committer
Expand Down Expand Up @@ -393,7 +402,7 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order
blockingMode,
stateConfig)
if g.deliveryService[channelID] == nil {
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, g.mcs, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, ordererEndpointOverrides, g.serviceConfig.OrgLeader, channelConfig, cryptoProvider)
}

// Delivery service might be nil only if it was not able to get connected
Expand Down
14 changes: 7 additions & 7 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestLeaderElectionWithDeliverClient(t *testing.T) {
gossips[i].deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running = false

gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
service, exist := gossips[i].leaderElection[channelName]
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running = false
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}
Expand All @@ -265,7 +265,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
channelName = "chanB"
for i := 0; i < n; i++ {
deliverServiceFactory.service.running = false
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestWithStaticDeliverClientNotLeader(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].deliveryFactory = deliverServiceFactory
deliverServiceFactory.service.running = false
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestWithStaticDeliverClientBothStaticAndLeaderElection(t *testing.T) {
for i := 0; i < n; i++ {
gossips[i].deliveryFactory = deliverServiceFactory
require.Panics(t, func() {
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, nil, nil)
}, "Dynamic leader election based and static connection to ordering service can't exist simultaneously")
Expand All @@ -367,7 +367,7 @@ type mockDeliverServiceFactory struct {
service *mockDeliverService
}

func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, *orderers.ConnectionSource, api.MessageCryptoService, bool, *common.Config, bccsp.BCCSP) deliverservice.DeliverService {
func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, *orderers.ConnectionSource, map[string]*orderers.Endpoint, bool, *common.Config, bccsp.BCCSP) deliverservice.DeliverService {
return mf.service
}

Expand Down Expand Up @@ -905,7 +905,7 @@ func TestInvalidInitialization(t *testing.T) {
go grpcServer.Serve(socket)
defer grpcServer.Stop()

dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), &naiveCryptoService{}, false, nil, nil)
dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), nil, false, nil, nil)
require.NotNil(t, dc)
}

Expand Down
18 changes: 5 additions & 13 deletions gossip/service/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/internal/configtxgen/encoder"
Expand Down Expand Up @@ -81,8 +80,8 @@ type embeddingDeliveryServiceFactory struct {
DeliveryServiceFactory
}

func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLead bool, channelConfig *common.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService {
ds := edsf.DeliveryServiceFactory.Service(g, ordererSource, mcs, false, channelConfig, cryptoProvider)
func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, ordererEndpointOverrides map[string]*orderers.Endpoint, isStaticLead bool, channelConfig *common.Config, cryptoProvider bccsp.BCCSP) deliverservice.DeliverService {
ds := edsf.DeliveryServiceFactory.Service(g, ordererSource, nil, false, channelConfig, cryptoProvider)
return newEmbeddingDeliveryService(ds)
}

Expand Down Expand Up @@ -150,16 +149,9 @@ func TestLeaderYield(t *testing.T) {
},
}

gs.InitializeChannel(
channelName,
orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil),
store.Store,
Support{
Committer: &mockLedgerInfo{1},
},
channelConfigProto,
cryptoProvider,
)
gs.InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), nil, store.Store, Support{
Committer: &mockLedgerInfo{1},
}, channelConfigProto, cryptoProvider)
return gs
}

Expand Down
2 changes: 2 additions & 0 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-lib-go/healthz"
"github.com/hyperledger/fabric-protos-go/orderer/etcdraft"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/integration/channelparticipation"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/integration/nwo/commands"
Expand Down Expand Up @@ -409,6 +410,7 @@ var _ = Describe("EndToEnd", func() {
)

BeforeEach(func() {
flogging.ActivateSpec("debug")
network = nwo.New(nwo.MultiChannelEtcdRaft(), testDir, client, StartPort(), components)
network.GenerateConfigTree()
for _, peer := range network.Peers {
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (cs *CredentialSupport) BuildTrustedRootsForChain(cm channelconfig.Resource
}
}

// TODO remove?
ordOrgMSPs := make(map[string]struct{})
if ac, ok := cm.OrdererConfig(); ok {
for _, ordOrg := range ac.Organizations() {
Expand Down
Loading

0 comments on commit 15b67c8

Please sign in to comment.