From e4060ed37d7abf7b37bbb191153c9ffb50053833 Mon Sep 17 00:00:00 2001 From: Jay Guo Date: Tue, 22 Jan 2019 16:07:45 +0800 Subject: [PATCH] [FAB-13455] Initialize BlockPuller on demand. The creation of BlockPuller takes latest certificates, therefore should be done on-demand to guarantee its validity. Change-Id: I327275da495a85126feb58c84b460bed98f7b860 Signed-off-by: Jay Guo --- orderer/consensus/etcdraft/chain.go | 21 ++++++++++++++------- orderer/consensus/etcdraft/chain_test.go | 9 ++++++++- orderer/consensus/etcdraft/consenter.go | 14 ++++++++------ 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index 8680d21f4c4..47c14a6a7ab 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -59,6 +59,10 @@ type BlockPuller interface { Close() } +// CreateBlockPuller is a function to create BlockPuller on demand. +// It is passed into chain initializer so that tests could mock this. +type CreateBlockPuller func() (BlockPuller, error) + // Options contains all the configurations relevant to the chain. type Options struct { RaftID uint64 @@ -132,7 +136,8 @@ type Chain struct { // needed by snapshotting lastSnapBlockNum uint64 confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot - puller BlockPuller // Deliver client to pull blocks from other OSNs + + createPuller CreateBlockPuller // func used to create BlockPuller on demand fresh bool // indicate if this is a fresh raft node @@ -148,7 +153,7 @@ func NewChain( opts Options, conf Configurator, rpc RPC, - puller BlockPuller, + f CreateBlockPuller, observeC chan<- raft.SoftState) (*Chain, error) { lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID) @@ -190,7 +195,7 @@ func NewChain( fresh: fresh, appliedIndex: opts.RaftMetadata.RaftIndex, lastSnapBlockNum: snapBlkNum, - puller: puller, + createPuller: f, clock: opts.Clock, logger: lg, opts: opts, @@ -688,12 +693,14 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error { return nil } - defer func() { - c.puller.Close() - }() + puller, err := c.createPuller() + if err != nil { + return errors.Errorf("failed to create block puller: %s", err) + } + defer puller.Close() for next <= b.Header.Number { - block := c.puller.PullBlock(next) + block := puller.PullBlock(next) if block == nil { return errors.Errorf("failed to fetch block %d from cluster", next) } diff --git a/orderer/consensus/etcdraft/chain_test.go b/orderer/consensus/etcdraft/chain_test.go index 2028628414c..aa6e4b7e57e 100644 --- a/orderer/consensus/etcdraft/chain_test.go +++ b/orderer/consensus/etcdraft/chain_test.go @@ -2607,7 +2607,14 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64, } func (c *chain) init() { - ch, err := etcdraft.NewChain(c.support, c.opts, c.configurator, c.rpc, c.puller, c.observe) + ch, err := etcdraft.NewChain( + c.support, + c.opts, + c.configurator, + c.rpc, + func() (etcdraft.BlockPuller, error) { return c.puller, nil }, + c.observe, + ) Expect(err).NotTo(HaveOccurred()) c.Chain = ch } diff --git a/orderer/consensus/etcdraft/consenter.go b/orderer/consensus/etcdraft/consenter.go index 1282a1baff6..9869a9827cc 100644 --- a/orderer/consensus/etcdraft/consenter.go +++ b/orderer/consensus/etcdraft/consenter.go @@ -144,11 +144,6 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co return &inactive.Chain{Err: errors.Errorf("channel %s is not serviced by me", support.ChainID())}, nil } - bp, err := newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster) - if err != nil { - return nil, errors.WithStack(err) - } - opts := Options{ RaftID: id, Clock: clock.NewClock(), @@ -173,7 +168,14 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co Comm: c.Communication, DestinationToStream: make(map[uint64]orderer.Cluster_SubmitClient), } - return NewChain(support, opts, c.Communication, rpc, bp, nil) + return NewChain( + support, + opts, + c.Communication, + rpc, + func() (BlockPuller, error) { return newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster) }, + nil, + ) } // ReadRaftMetadata attempts to read raft metadata from block metadata, if available.