Skip to content

Commit

Permalink
[FAB-13455] Initialize BlockPuller on demand.
Browse files Browse the repository at this point in the history
The creation of BlockPuller takes latest certificates, therefore
should be done on-demand to guarantee its validity.

Change-Id: I327275da495a85126feb58c84b460bed98f7b860
Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
guoger authored and C0rWin committed Mar 3, 2019
1 parent 082a910 commit e4060ed
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
21 changes: 14 additions & 7 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type BlockPuller interface {
Close()
}

// CreateBlockPuller is a function to create BlockPuller on demand.
// It is passed into chain initializer so that tests could mock this.
type CreateBlockPuller func() (BlockPuller, error)

// Options contains all the configurations relevant to the chain.
type Options struct {
RaftID uint64
Expand Down Expand Up @@ -132,7 +136,8 @@ type Chain struct {
// needed by snapshotting
lastSnapBlockNum uint64
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
puller BlockPuller // Deliver client to pull blocks from other OSNs

createPuller CreateBlockPuller // func used to create BlockPuller on demand

fresh bool // indicate if this is a fresh raft node

Expand All @@ -148,7 +153,7 @@ func NewChain(
opts Options,
conf Configurator,
rpc RPC,
puller BlockPuller,
f CreateBlockPuller,
observeC chan<- raft.SoftState) (*Chain, error) {

lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)
Expand Down Expand Up @@ -190,7 +195,7 @@ func NewChain(
fresh: fresh,
appliedIndex: opts.RaftMetadata.RaftIndex,
lastSnapBlockNum: snapBlkNum,
puller: puller,
createPuller: f,
clock: opts.Clock,
logger: lg,
opts: opts,
Expand Down Expand Up @@ -688,12 +693,14 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}

defer func() {
c.puller.Close()
}()
puller, err := c.createPuller()
if err != nil {
return errors.Errorf("failed to create block puller: %s", err)
}
defer puller.Close()

for next <= b.Header.Number {
block := c.puller.PullBlock(next)
block := puller.PullBlock(next)
if block == nil {
return errors.Errorf("failed to fetch block %d from cluster", next)
}
Expand Down
9 changes: 8 additions & 1 deletion orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2607,7 +2607,14 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
}

func (c *chain) init() {
ch, err := etcdraft.NewChain(c.support, c.opts, c.configurator, c.rpc, c.puller, c.observe)
ch, err := etcdraft.NewChain(
c.support,
c.opts,
c.configurator,
c.rpc,
func() (etcdraft.BlockPuller, error) { return c.puller, nil },
c.observe,
)
Expect(err).NotTo(HaveOccurred())
c.Chain = ch
}
Expand Down
14 changes: 8 additions & 6 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
return &inactive.Chain{Err: errors.Errorf("channel %s is not serviced by me", support.ChainID())}, nil
}

bp, err := newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster)
if err != nil {
return nil, errors.WithStack(err)
}

opts := Options{
RaftID: id,
Clock: clock.NewClock(),
Expand All @@ -173,7 +168,14 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
Comm: c.Communication,
DestinationToStream: make(map[uint64]orderer.Cluster_SubmitClient),
}
return NewChain(support, opts, c.Communication, rpc, bp, nil)
return NewChain(
support,
opts,
c.Communication,
rpc,
func() (BlockPuller, error) { return newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster) },
nil,
)
}

// ReadRaftMetadata attempts to read raft metadata from block metadata, if available.
Expand Down

0 comments on commit e4060ed

Please sign in to comment.