From 4e201afc69efb6d6546e1be904c5e658ca56b9a0 Mon Sep 17 00:00:00 2001 From: yacovm Date: Wed, 26 May 2021 01:35:43 +0300 Subject: [PATCH] Optionally disable gossip block forwarding (#2606) This commit adds a new configuration option to the peer which makes peers not forward blocks that they pull from the ordering service. If all peers in an organization explicitly set "peer.deliveryclient.blockGossipEnabled" to false, no peer in the organization gossips blocks to any other peer in that organization. Change-Id: I5d9b278ae72f239129827c044fa78179f6ba87ab Signed-off-by: Yacov Manevich --- core/deliverservice/config.go | 9 ++++++++ core/deliverservice/config_test.go | 2 ++ core/deliverservice/deliveryclient.go | 19 ++++++++-------- .../pkg/peer/blocksprovider/blocksprovider.go | 14 ++++++++---- .../blocksprovider/blocksprovider_test.go | 22 +++++++++++++++++++ sampleconfig/core.yaml | 5 +++++ 6 files changed, 58 insertions(+), 13 deletions(-) diff --git a/core/deliverservice/config.go b/core/deliverservice/config.go index 262d22e3cbc..09af248f052 100644 --- a/core/deliverservice/config.go +++ b/core/deliverservice/config.go @@ -29,6 +29,8 @@ const ( type DeliverServiceConfig struct { // PeerTLSEnabled enables/disables Peer TLS. PeerTLSEnabled bool + // BlockGossipEnabled enables block forwarding via gossip + BlockGossipEnabled bool // ReConnectBackoffThreshold sets the delivery service maximal delay between consencutive retries. ReConnectBackoffThreshold time.Duration // ReconnectTotalTimeThreshold sets the total time the delivery service may spend in reconnection attempts @@ -115,6 +117,13 @@ func extractCerts(pemCerts []byte) [][]byte { } func (c *DeliverServiceConfig) loadDeliverServiceConfig() { + enabledKey := "peer.deliveryclient.blockGossipEnabled" + enabledConfigOptionMissing := !viper.IsSet(enabledKey) + if enabledConfigOptionMissing { + logger.Infof("peer.deliveryclient.blockGossipEnabled is not set, defaulting to true.") + } + c.BlockGossipEnabled = enabledConfigOptionMissing || viper.GetBool(enabledKey) + c.PeerTLSEnabled = viper.GetBool("peer.tls.enabled") c.ReConnectBackoffThreshold = viper.GetDuration("peer.deliveryclient.reConnectBackoffThreshold") diff --git a/core/deliverservice/config_test.go b/core/deliverservice/config_test.go index 3d64d7e5197..b3658bb83a6 100644 --- a/core/deliverservice/config_test.go +++ b/core/deliverservice/config_test.go @@ -92,6 +92,7 @@ func TestGlobalConfig(t *testing.T) { coreConfig := deliverservice.GlobalConfig() expectedConfig := &deliverservice.DeliverServiceConfig{ + BlockGossipEnabled: true, PeerTLSEnabled: true, ReConnectBackoffThreshold: 25 * time.Second, ReconnectTotalTimeThreshold: 20 * time.Second, @@ -118,6 +119,7 @@ func TestGlobalConfigDefault(t *testing.T) { coreConfig := deliverservice.GlobalConfig() expectedConfig := &deliverservice.DeliverServiceConfig{ + BlockGossipEnabled: true, PeerTLSEnabled: false, ReConnectBackoffThreshold: deliverservice.DefaultReConnectBackoffThreshold, ReconnectTotalTimeThreshold: deliverservice.DefaultReConnectTotalTimeThreshold, diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 32ff16388a3..5e1d2a0a84a 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -130,15 +130,16 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b SecOpts: d.conf.DeliverServiceConfig.SecOpts, }, }, - Orderers: d.conf.OrdererSource, - DoneC: make(chan struct{}), - Signer: d.conf.Signer, - DeliverStreamer: DeliverAdapter{}, - Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), - MaxRetryDelay: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, - MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold, - InitialRetryDelay: 100 * time.Millisecond, - YieldLeadership: !d.conf.IsStaticLeader, + Orderers: d.conf.OrdererSource, + DoneC: make(chan struct{}), + Signer: d.conf.Signer, + DeliverStreamer: DeliverAdapter{}, + Logger: flogging.MustGetLogger("peer.blocksprovider").With("channel", chainID), + MaxRetryDelay: d.conf.DeliverServiceConfig.ReConnectBackoffThreshold, + MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold, + BlockGossipDisabled: !d.conf.DeliverServiceConfig.BlockGossipEnabled, + InitialRetryDelay: 100 * time.Millisecond, + YieldLeadership: !d.conf.IsStaticLeader, } if d.conf.DeliverServiceConfig.SecOpts.RequireClientCert { diff --git a/internal/pkg/peer/blocksprovider/blocksprovider.go b/internal/pkg/peer/blocksprovider/blocksprovider.go index 9627ebaf961..61ac8966e51 100644 --- a/internal/pkg/peer/blocksprovider/blocksprovider.go +++ b/internal/pkg/peer/blocksprovider/blocksprovider.go @@ -95,9 +95,10 @@ type Deliverer struct { Logger *flogging.FabricLogger YieldLeadership bool - MaxRetryDelay time.Duration - InitialRetryDelay time.Duration - MaxRetryDuration time.Duration + BlockGossipDisabled bool + MaxRetryDelay time.Duration + InitialRetryDelay time.Duration + MaxRetryDuration time.Duration // TLSCertHash should be nil when TLS is not enabled TLSCertHash []byte // util.ComputeSHA256(b.credSupport.GetClientCertificate().Certificate[0]) @@ -110,6 +111,9 @@ const backoffExponentBase = 1.2 // DeliverBlocks used to pull out blocks from the ordering service to // distributed them across peers func (d *Deliverer) DeliverBlocks() { + if d.BlockGossipDisabled { + d.Logger.Infof("Will pull blocks without forwarding them to remote peers via gossip") + } failureCounter := 0 totalDuration := time.Duration(0) @@ -255,7 +259,9 @@ func (d *Deliverer) processMsg(msg *orderer.DeliverResponse) error { d.Logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err) return errors.WithMessage(err, "could not add block as payload") } - + if d.BlockGossipDisabled { + return nil + } // Gossip messages with other nodes d.Logger.Debugf("Gossiping block [%d]", blockNum) d.Gossip.Gossip(gossipMsg) diff --git a/internal/pkg/peer/blocksprovider/blocksprovider_test.go b/internal/pkg/peer/blocksprovider/blocksprovider_test.go index 86e64640854..27611b6b3dd 100644 --- a/internal/pkg/peer/blocksprovider/blocksprovider_test.go +++ b/internal/pkg/peer/blocksprovider/blocksprovider_test.go @@ -532,6 +532,28 @@ var _ = Describe("Blocksprovider", func() { }, })) }) + + When("gossip dissemination is disabled", func() { + BeforeEach(func() { + d.BlockGossipDisabled = true + }) + + It("doesn't gossip, only adds to the payload buffer", func() { + Eventually(fakeGossipServiceAdapter.AddPayloadCallCount).Should(Equal(1)) + channelID, payload := fakeGossipServiceAdapter.AddPayloadArgsForCall(0) + Expect(channelID).To(Equal("channel-id")) + Expect(payload).To(Equal(&gossip.Payload{ + Data: protoutil.MarshalOrPanic(&common.Block{ + Header: &common.BlockHeader{ + Number: 8, + }, + }), + SeqNum: 8, + })) + + Consistently(fakeGossipServiceAdapter.GossipCallCount).Should(Equal(0)) + }) + }) }) When("the deliver client returns a status", func() { diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 0121d4abf11..3486301b377 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -348,6 +348,11 @@ peer: # Delivery service related config deliveryclient: + # Enables this peer to disseminate blocks it pulled from the ordering service + # via gossip. + # Note that 'gossip.state.enabled' controls point to point block replication + # of blocks committed in the past. + blockGossipEnabled: true # It sets the total time the delivery service may spend in reconnection # attempts until its retry logic gives up and returns an error reconnectTotalTimeThreshold: 3600s