Skip to content

Commit

Permalink
Merge "FAB-15043 Mig-v1 cleanup #2"
Browse files Browse the repository at this point in the history
  • Loading branch information
yacovm authored and Gerrit Code Review committed Apr 11, 2019
2 parents 16fbff6 + 22ac9e6 commit c28d6dc
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 148 deletions.
56 changes: 0 additions & 56 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -69,14 +68,6 @@ func newChainSupport(
// Set up the block writer
cs.BlockWriter = newBlockWriter(lastBlock, registrar, cs)

// TODO Identify recovery after crash in the middle of consensus-type migration
if cs.detectMigration(lastBlock) {
// We do this because the last block after migration (COMMIT/CONTEXT) carries Kafka metadata.
// This prevents the code down the line from unmarshaling it as Raft, and panicking.
metadata.Value = nil
logger.Debugf("[channel: %s] Consensus-type migration: restart on to Raft, resetting Kafka block metadata", cs.ChainID())
}

// Set up the consenter
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
Expand All @@ -94,53 +85,6 @@ func newChainSupport(
return cs
}

// detectMigration identifies restart after consensus-type migration was committed (green path).
// Restart after migration is detected by:
// 1. The Kafka2RaftMigration capability in on
// 2. The last block carries a config-tx
// 3. In the config-tx, you have:
// - (system-channel && state=COMMIT), OR
// - (standard-channel && state=CONTEXT)
// This assumes that migration was successful (green path). When migration ends successfully,
// every channel will have a config block as the last block. On the system channel, containing state=COMMIT;
// on standard channels, containing state=CONTEXT.
func (cs *ChainSupport) detectMigration(lastBlock *cb.Block) bool {
isMigration := false

if !cs.ledgerResources.SharedConfig().Capabilities().Kafka2RaftMigration() {
return isMigration
}

lastConfigIndex, err := protoutil.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
}

logger.Debugf("[channel: %s], sysChan=%v, lastConfigIndex=%d, H=%d, mig-state: %s",
cs.ChainID(), cs.systemChannel, lastConfigIndex, cs.ledgerResources.Height(),
cs.ledgerResources.SharedConfig().ConsensusMigrationState())

if lastConfigIndex == lastBlock.Header.Number { //The last block was a config-tx
state := cs.ledgerResources.SharedConfig().ConsensusMigrationState()
if cs.systemChannel {
if state == orderer.ConsensusType_MIG_STATE_COMMIT {
isMigration = true
}
} else {
if state == orderer.ConsensusType_MIG_STATE_CONTEXT {
isMigration = true
}
}

if isMigration {
logger.Infof("[channel: %s], Restarting after consensus-type migration. New consensus-type is: %s",
cs.ChainID(), cs.ledgerResources.SharedConfig().ConsensusType())
}
}

return isMigration
}

// Block returns a block with the following number,
// or nil if such a block doesn't exist.
func (cs *ChainSupport) Block(number uint64) *cb.Block {
Expand Down
50 changes: 1 addition & 49 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,7 @@ func (c *Chain) Start() {
}

isJoin := c.support.Height() > 1
isMigration := false
if isJoin {
isMigration = c.detectMigration()
}
c.Node.start(c.fresh, isJoin, isMigration)
c.Node.start(c.fresh, isJoin)

close(c.startC)
close(c.errorC)
Expand All @@ -358,42 +354,6 @@ func (c *Chain) Start() {
c.periodicChecker.Run()
}

// detectMigration detects if the orderer restarts right after consensus-type migration,
// in which the Height>1 but previous blocks were created by Kafka.
// If this is the case, Raft should be started like it is joining a new channel.
func (c *Chain) detectMigration() bool {
startOfChain := false
if c.support.SharedConfig().Capabilities().Kafka2RaftMigration() {
lastConfigIndex, err := protoutil.GetLastConfigIndexFromBlock(c.lastBlock)
if err != nil {
c.logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
}

c.logger.Debugf("Detecting if consensus-type migration, sysChan=%v, lastConfigIndex=%d, Height=%d, mig-state: %s",
c.support.IsSystemChannel(), lastConfigIndex, c.lastBlock.Header.Number+1, c.support.SharedConfig().ConsensusMigrationState().String())

if lastConfigIndex != c.lastBlock.Header.Number { // The last block is not a config-tx
return startOfChain
}

// The last block was a config-tx
if c.support.IsSystemChannel() {
if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_COMMIT {
startOfChain = true
}
} else {
if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_CONTEXT {
startOfChain = true
}
}

if startOfChain {
c.logger.Infof("Restarting after consensus-type migration. Type: %s, just starting the channel.", c.support.SharedConfig().ConsensusType())
}
}
return startOfChain
}

// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
c.Metrics.NormalProposalsReceived.Add(1)
Expand Down Expand Up @@ -1330,14 +1290,6 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
return nil
}

// Detect if it is a restart right after consensus-type migration. If yes, return early in order to avoid using
// the block metadata as etcdraft.BlockMetadata (see below). Right after migration the block metadata will carry
// Kafka metadata. The etcdraft.BlockMetadata should be extracted from the ConsensusType.Metadata, instead.
if c.detectMigration() {
c.logger.Infof("Restarting after consensus-type migration. Type: %s, just starting the chain.", c.support.SharedConfig().ConsensusType())
return nil
}

// extracting current Raft configuration state
confState := c.Node.ApplyConfChange(raftpb.ConfChange{})

Expand Down
7 changes: 1 addition & 6 deletions orderer/consensus/etcdraft/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3477,12 +3477,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,

support := &consensusmocks.FakeConsenterSupport{}
support.ChainIDReturns(channel)
support.SharedConfigReturns(&mockconfig.Orderer{
BatchTimeoutVal: timeout,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
})
support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

cutter := mockblockcutter.NewReceiver()
close(cutter.Block)
Expand Down
17 changes: 0 additions & 17 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package etcdraft

import (
"bytes"
"encoding/hex"
"path"
"reflect"
"time"
Expand Down Expand Up @@ -120,27 +119,11 @@ func (c *Consenter) detectSelfID(consenters map[uint64]*etcdraft.Consenter) (uin

// HandleChain returns a new Chain instance or an error upon failure
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *common.Metadata) (consensus.Chain, error) {

if support.SharedConfig().Capabilities().Kafka2RaftMigration() {
c.Logger.Debugf("SharedConfig.ConsensusType fields: Type=%s, ConsensusMigrationState=%s, ConsensusMigrationContext=%d, ConsensusMetadata length=%d",
support.SharedConfig().ConsensusType(), support.SharedConfig().ConsensusMigrationState(),
support.SharedConfig().ConsensusMigrationContext(), len(support.SharedConfig().ConsensusMetadata()))
if support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE {
c.Logger.Debugf("SharedConfig.ConsensusType: ConsensusMetadata dump:\n%s", hex.Dump(support.SharedConfig().ConsensusMetadata()))
}
}

m := &etcdraft.ConfigMetadata{}
if err := proto.Unmarshal(support.SharedConfig().ConsensusMetadata(), m); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
}

if support.SharedConfig().Capabilities().Kafka2RaftMigration() &&
support.SharedConfig().ConsensusMigrationState() != orderer.ConsensusType_MIG_STATE_NONE {
c.Logger.Debugf("SharedConfig().ConsensusMetadata(): %s", m.String())
c.Logger.Debugf("block metadata.Value dump: \n%s", hex.Dump(metadata.Value))
}

if m.Options == nil {
return nil, errors.New("etcdraft options have not been provided")
}
Expand Down
15 changes: 3 additions & 12 deletions orderer/consensus/etcdraft/consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,7 @@ var _ = Describe("Consenter", func() {
metadata := protoutil.MarshalOrPanic(m)
support.SharedConfigReturns(&mockconfig.Orderer{
ConsensusMetadataVal: metadata,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
})

consenter := newConsenter(chainGetter)
Expand Down Expand Up @@ -199,10 +196,7 @@ var _ = Describe("Consenter", func() {
support := &consensusmocks.FakeConsenterSupport{}
support.SharedConfigReturns(&mockconfig.Orderer{
ConsensusMetadataVal: metadata,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
})
support.ChainIDReturns("foo")

Expand All @@ -224,10 +218,7 @@ var _ = Describe("Consenter", func() {
metadata := protoutil.MarshalOrPanic(m)
support.SharedConfigReturns(&mockconfig.Orderer{
ConsensusMetadataVal: metadata,
CapabilitiesVal: &mockconfig.OrdererCapabilities{
Kafka2RaftMigVal: false,
},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
BatchSizeVal: &orderer.BatchSize{PreferredMaxBytes: 2 * 1024 * 1024},
})

consenter := newConsenter(chainGetter)
Expand Down
11 changes: 3 additions & 8 deletions orderer/consensus/etcdraft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,15 @@ type node struct {
raft.Node
}

func (n *node) start(fresh, join, migration bool) {
func (n *node) start(fresh, join bool) {
raftPeers := RaftPeers(n.metadata.ConsenterIds)
n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers))

var campaign bool
if fresh {
if join {
if !migration {
raftPeers = nil
n.logger.Info("Starting raft node to join an existing channel")

} else {
n.logger.Info("Starting raft node to join an existing channel, after consensus-type migration")
}
raftPeers = nil
n.logger.Info("Starting raft node to join an existing channel")
} else {
n.logger.Info("Starting raft node as part of a new channel")

Expand Down

0 comments on commit c28d6dc

Please sign in to comment.